[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread liurenjie1024
Github user liurenjie1024 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174366242
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
+* The method is called after [[TableSink.configure()]].
+*
+* The keys array might be empty, if the table consists of a single 
(updated) record.
+* If the table does not have a key and is append-only, the keys 
attribute is null.
+*
+* @param keys the field names of the table's keys, an empty array if 
the table has a single
+* row, and null if the table is append-only and has no key.
+*/
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  /**
+* Specifies whether the [[Table]] to write is append-only or not.
+*
+* @param isAppendOnly true if the table is append-only, false 
otherwise.
+*/
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only tables " +
+"as the table would grow infinitely")
+}
+  }
+
+  /** Returns the requested record type */
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  /** Emits the DataStream. */
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
--- End diff --

I'm waiting another [PR](https://github.com/apache/flink/pull/5680) to be 
merged.


---


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398166#comment-16398166
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user liurenjie1024 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174366242
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
+* The method is called after [[TableSink.configure()]].
+*
+* The keys array might be empty, if the table consists of a single 
(updated) record.
+* If the table does not have a key and is append-only, the keys 
attribute is null.
+*
+* @param keys the field names of the table's keys, an empty array if 
the table has a single
+* row, and null if the table is append-only and has no key.
+*/
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  /**
+* Specifies whether the [[Table]] to write is append-only or not.
+*
+* @param isAppendOnly true if the table is append-only, false 
otherwise.
+*/
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only tables " +
+"as the table would grow infinitely")
+}
+  }
+
+  /** Returns the requested record type */
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  /** Emits the DataStream. */
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
--- End diff --

I'm waiting another [PR](https://github.com/apache/flink/pull/5680) to be 
merged.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968

[jira] [Commented] (FLINK-8919) Add KeyedProcessFunctionWIthCleanupState

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398162#comment-16398162
 ] 

ASF GitHub Bot commented on FLINK-8919:
---

Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5680
  
@bowenli86 @fhueske UT added.

This class is a utility class which can be the base class of many process 
function implementations in flink table. Its counterpart for the legacy 
`ProcessFunction `interface, `ProcessFunctionWithCleanupState`, has been 
inherited by many implementations. In fact, my other 
[PR](https://github.com/apache/flink/pull/5688) depends on this.


> Add KeyedProcessFunctionWIthCleanupState
> 
>
> Key: FLINK-8919
> URL: https://issues.apache.org/jira/browse/FLINK-8919
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Minor
> Fix For: 1.6.0
>
>
> ProcessFunctionWithCleanupState is a useful tool and I think we also need one 
> for the new KeyedProcessFunction api.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

2018-03-13 Thread liurenjie1024
Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5680
  
@bowenli86 @fhueske UT added.

This class is a utility class which can be the base class of many process 
function implementations in flink table. Its counterpart for the legacy 
`ProcessFunction `interface, `ProcessFunctionWithCleanupState`, has been 
inherited by many implementations. In fact, my other 
[PR](https://github.com/apache/flink/pull/5688) depends on this.


---


[jira] [Commented] (FLINK-8862) Support HBase snapshot read

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398094#comment-16398094
 ] 

ASF GitHub Bot commented on FLINK-8862:
---

Github user neoremind commented on the issue:

https://github.com/apache/flink/pull/5639
  
@fhueske Thanks for your response. I understand this case. Please take your 
time, hope this PR can be reviewed in the future and help people who needed. 
Thanks!


> Support HBase snapshot read
> ---
>
> Key: FLINK-8862
> URL: https://issues.apache.org/jira/browse/FLINK-8862
> Project: Flink
>  Issue Type: Improvement
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.0
>Reporter: Xu Zhang
>Priority: Major
> Attachments: FLINK-8862-Design-Class-Diagram.png, 
> FLINK-8862-DesignDoc.pdf
>
>
> Flink-hbase connector only supports reading/scanning HBase over region server 
> scanner, there is also snapshot scanning solution, just like Hadoop provides 
> 2 ways to scan HBase, one is TableInputFormat, the other is 
> TableSnapshotInputFormat, so it would be great if flink supports both 
> solutions to ensure more wider usage scope and provide alternatives for users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5639: [FLINK-8862] [HBase] Support HBase snapshot read

2018-03-13 Thread neoremind
Github user neoremind commented on the issue:

https://github.com/apache/flink/pull/5639
  
@fhueske Thanks for your response. I understand this case. Please take your 
time, hope this PR can be reviewed in the future and help people who needed. 
Thanks!


---


[jira] [Commented] (FLINK-8938) Not remove job graph during job master failover

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398081#comment-16398081
 ] 

ASF GitHub Bot commented on FLINK-8938:
---

GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5693

[FLINK-8938] [runtime] not remove job graph during job master failover


## What is the purpose of the change

*This pull request fix a bug that when job master failover, it may delete 
the job graph, so the next job master can not recover the job any more.*


## Verifying this change

*(Please pick either of the following options)*

This change is tested manually.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8938

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5693.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5693


commit 2fc5ec8795b5a53ce0f653a0e0ad5c1346e826c8
Author: shuai.xus 
Date:   2018-03-14T04:16:55Z

[FLINK-8938] [runtime] not remove job graph during job master failover




> Not  remove job graph during job master failover
> 
>
> Key: FLINK-8938
> URL: https://issues.apache.org/jira/browse/FLINK-8938
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>
> When job master failover, the new dispatcher should not delete the job graph 
> if failed to start the job manager runner, or else the other dispatchers can 
> not recover it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5693: [FLINK-8938] [runtime] not remove job graph during...

2018-03-13 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5693

[FLINK-8938] [runtime] not remove job graph during job master failover


## What is the purpose of the change

*This pull request fix a bug that when job master failover, it may delete 
the job graph, so the next job master can not recover the job any more.*


## Verifying this change

*(Please pick either of the following options)*

This change is tested manually.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8938

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5693.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5693


commit 2fc5ec8795b5a53ce0f653a0e0ad5c1346e826c8
Author: shuai.xus 
Date:   2018-03-14T04:16:55Z

[FLINK-8938] [runtime] not remove job graph during job master failover




---


[jira] [Created] (FLINK-8938) Not remove job graph during job master failover

2018-03-13 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8938:
---

 Summary: Not  remove job graph during job master failover
 Key: FLINK-8938
 URL: https://issues.apache.org/jira/browse/FLINK-8938
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


When job master failover, the new dispatcher should not delete the job graph if 
failed to start the job manager runner, or else the other dispatchers can not 
recover it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8915) CheckpointingStatisticsHandler fails to return PendingCheckpointStats

2018-03-13 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398055#comment-16398055
 ] 

vinoyang commented on FLINK-8915:
-

Hi [~gjy] , I just analyzed this issue. The real source is 
CheckpointStatsHistory's 
{code:java}
/** Array of checkpointsArray. Writes go against this array. */
private transient AbstractCheckpointStats[] checkpointsArray;
{code}
I found the pending checkpoint could also be added to this array through method 
:
{code:java}
void addInProgressCheckpoint(PendingCheckpointStats pending) {
{code}
then when the checkpoint transfered to the terminal state (completed or 
failed), it will trigger this callback method :
{code:java}
boolean replacePendingCheckpointById(AbstractCheckpointStats completedOrFailed) 
{
{code}
and others access the history info through the *createSnapshot* method which 
return the instance of *CheckpointStatsHistory*. However, this method create 
the instance with the checkpointArray. So it may constain the 
pendingcheckpoint. 

In my opinion, we should think the sematics of the *CheckpointStatsHistory* , 
when we call the *createSnapshot* method, whether it can contains pending 
checkpoint or not. If not ,we can filter pending checkpoint by status. If yes, 
we can also judge the status and give a error log or something handle way.

What's your opinion?

 

 

 

 

 

> CheckpointingStatisticsHandler fails to return PendingCheckpointStats 
> --
>
> Key: FLINK-8915
> URL: https://issues.apache.org/jira/browse/FLINK-8915
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip6
> Fix For: 1.5.0
>
>
> {noformat}
> 2018-03-10 21:47:52,487 ERROR 
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler
>   - Implementation error: Unhandled exception.
> java.lang.IllegalArgumentException: Given checkpoint stats object of type 
> org.apache.flink.runtime.checkpoint.PendingCheckpointStats cannot be 
> converted.
>   at 
> org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics.generateCheckpointStatistics(CheckpointStatistics.java:276)
>   at 
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:146)
>   at 
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:54)
>   at 
> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:81)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8915) CheckpointingStatisticsHandler fails to return PendingCheckpointStats

2018-03-13 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-8915:
---

Assignee: vinoyang

> CheckpointingStatisticsHandler fails to return PendingCheckpointStats 
> --
>
> Key: FLINK-8915
> URL: https://issues.apache.org/jira/browse/FLINK-8915
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip6
> Fix For: 1.5.0
>
>
> {noformat}
> 2018-03-10 21:47:52,487 ERROR 
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler
>   - Implementation error: Unhandled exception.
> java.lang.IllegalArgumentException: Given checkpoint stats object of type 
> org.apache.flink.runtime.checkpoint.PendingCheckpointStats cannot be 
> converted.
>   at 
> org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics.generateCheckpointStatistics(CheckpointStatistics.java:276)
>   at 
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:146)
>   at 
> org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler.handleRequest(CheckpointingStatisticsHandler.java:54)
>   at 
> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:81)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174339296
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
+* The method is called after [[TableSink.configure()]].
+*
+* The keys array might be empty, if the table consists of a single 
(updated) record.
+* If the table does not have a key and is append-only, the keys 
attribute is null.
+*
+* @param keys the field names of the table's keys, an empty array if 
the table has a single
+* row, and null if the table is append-only and has no key.
+*/
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  /**
+* Specifies whether the [[Table]] to write is append-only or not.
+*
+* @param isAppendOnly true if the table is append-only, false 
otherwise.
+*/
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only tables " +
+"as the table would grow infinitely")
+}
+  }
+
+  /** Returns the requested record type */
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  /** Emits the DataStream. */
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
+  }
+
+  /** Return a deep copy of the [[TableSink]]. */
+  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
--- End diff --

The docs for overridden methods could be omitted.


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338968
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
--- End diff --

Use the full class name for `[[Table]]` since it's not imported.


---


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398018#comment-16398018
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338993
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
--- End diff --

Remove unused imports.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398015#comment-16398015
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338968
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
--- End diff --

Use the full class name for `[[Table]]` since it's not imported.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398020#comment-16398020
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174339810
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.table.sinks.{QueryableStateProcessFunction, 
RowKeySelector}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class QueryableTableSinkTest extends HarnessTestBase {
+  @Test
+  def testRowSelector(): Unit = {
+val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), 
TypeInformation.of(classOf[String]))
+val selector = new RowKeySelector(Array(0, 2), new 
RowTypeInfo(keyTypes:_*))
+
+val src = Row.of(List(1), "a", "b")
+val key = selector.getKey(JTuple2.of(true, src))
+
+assertEquals(Row.of(List(1), "b"), key)
+  }
+
+  @Test
+  def testProcessFunction(): Unit = {
+val queryConfig = new StreamQueryConfig()
+  .withIdleStateRetentionTime(Time.milliseconds(2), 
Time.milliseconds(10))
+
+val keys = Array("id")
+val keyType = new RowTypeInfo(TypeInformation.of(classOf[String]))
+val fieldNames = Array("id", "is_manager", "name")
+val fieldTypes: Array[TypeInformation[_]] = Array(
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]])
+val func = new QueryableStateProcessFunction("test", queryConfig, 
keys, fieldNames, fieldTypes)
+
+val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, 
Row], Void](func)
+
+val testHarness = createHarnessTester(operator,
+  new RowKeySelector(Array(0), keyType),
+  keyType)
+
+testHarness.open()
+
+
--- End diff --

Remove extra blank lines.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398017#comment-16398017
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338981
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
--- End diff --

Format the code like that.
```
class QueryableTableSink(
private val namePrefix: String,
private val queryConfig: StreamQueryConfig)
  extends UpsertStreamTableSink[Row]
...
```



> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398016#comment-16398016
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174339658
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.table.sinks.{QueryableStateProcessFunction, 
RowKeySelector}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class QueryableTableSinkTest extends HarnessTestBase {
+  @Test
+  def testRowSelector(): Unit = {
+val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), 
TypeInformation.of(classOf[String]))
+val selector = new RowKeySelector(Array(0, 2), new 
RowTypeInfo(keyTypes:_*))
+
+val src = Row.of(List(1), "a", "b")
+val key = selector.getKey(JTuple2.of(true, src))
+
+assertEquals(Row.of(List(1), "b"), key)
+  }
+
+  @Test
+  def testProcessFunction(): Unit = {
+val queryConfig = new StreamQueryConfig()
+  .withIdleStateRetentionTime(Time.milliseconds(2), 
Time.milliseconds(10))
+
+val keys = Array("id")
+val keyType = new RowTypeInfo(TypeInformation.of(classOf[String]))
+val fieldNames = Array("id", "is_manager", "name")
+val fieldTypes: Array[TypeInformation[_]] = Array(
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]])
+val func = new QueryableStateProcessFunction("test", queryConfig, 
keys, fieldNames, fieldTypes)
+
+val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, 
Row], Void](func)
--- End diff --

Try to avoid using deprecated classes/methods.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398019#comment-16398019
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174339296
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
+* The method is called after [[TableSink.configure()]].
+*
+* The keys array might be empty, if the table consists of a single 
(updated) record.
+* If the table does not have a key and is append-only, the keys 
attribute is null.
+*
+* @param keys the field names of the table's keys, an empty array if 
the table has a single
+* row, and null if the table is append-only and has no key.
+*/
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  /**
+* Specifies whether the [[Table]] to write is append-only or not.
+*
+* @param isAppendOnly true if the table is append-only, false 
otherwise.
+*/
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only tables " +
+"as the table would grow infinitely")
+}
+  }
+
+  /** Returns the requested record type */
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  /** Emits the DataStream. */
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
+  }
+
+  /** Return a deep copy of the [[TableSink]]. */
+  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
--- End diff --

The docs for overridden methods could be omitted.


> Store streaming, updating tables with unique key in queryable state
> ---
>
>

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338993
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
--- End diff --

Remove unused imports.


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174339810
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.table.sinks.{QueryableStateProcessFunction, 
RowKeySelector}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class QueryableTableSinkTest extends HarnessTestBase {
+  @Test
+  def testRowSelector(): Unit = {
+val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), 
TypeInformation.of(classOf[String]))
+val selector = new RowKeySelector(Array(0, 2), new 
RowTypeInfo(keyTypes:_*))
+
+val src = Row.of(List(1), "a", "b")
+val key = selector.getKey(JTuple2.of(true, src))
+
+assertEquals(Row.of(List(1), "b"), key)
+  }
+
+  @Test
+  def testProcessFunction(): Unit = {
+val queryConfig = new StreamQueryConfig()
+  .withIdleStateRetentionTime(Time.milliseconds(2), 
Time.milliseconds(10))
+
+val keys = Array("id")
+val keyType = new RowTypeInfo(TypeInformation.of(classOf[String]))
+val fieldNames = Array("id", "is_manager", "name")
+val fieldTypes: Array[TypeInformation[_]] = Array(
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]])
+val func = new QueryableStateProcessFunction("test", queryConfig, 
keys, fieldNames, fieldTypes)
+
+val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, 
Row], Void](func)
+
+val testHarness = createHarnessTester(operator,
+  new RowKeySelector(Array(0), keyType),
+  keyType)
+
+testHarness.open()
+
+
--- End diff --

Remove extra blank lines.


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174339658
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.table.sinks.{QueryableStateProcessFunction, 
RowKeySelector}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class QueryableTableSinkTest extends HarnessTestBase {
+  @Test
+  def testRowSelector(): Unit = {
+val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), 
TypeInformation.of(classOf[String]))
+val selector = new RowKeySelector(Array(0, 2), new 
RowTypeInfo(keyTypes:_*))
+
+val src = Row.of(List(1), "a", "b")
+val key = selector.getKey(JTuple2.of(true, src))
+
+assertEquals(Row.of(List(1), "b"), key)
+  }
+
+  @Test
+  def testProcessFunction(): Unit = {
+val queryConfig = new StreamQueryConfig()
+  .withIdleStateRetentionTime(Time.milliseconds(2), 
Time.milliseconds(10))
+
+val keys = Array("id")
+val keyType = new RowTypeInfo(TypeInformation.of(classOf[String]))
+val fieldNames = Array("id", "is_manager", "name")
+val fieldTypes: Array[TypeInformation[_]] = Array(
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]])
+val func = new QueryableStateProcessFunction("test", queryConfig, 
keys, fieldNames, fieldTypes)
+
+val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, 
Row], Void](func)
--- End diff --

Try to avoid using deprecated classes/methods.


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338981
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
--- End diff --

Format the code like that.
```
class QueryableTableSink(
private val namePrefix: String,
private val queryConfig: StreamQueryConfig)
  extends UpsertStreamTableSink[Row]
...
```



---


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398001#comment-16398001
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338955
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
+* The method is called after [[TableSink.configure()]].
+*
+* The keys array might be empty, if the table consists of a single 
(updated) record.
+* If the table does not have a key and is append-only, the keys 
attribute is null.
+*
+* @param keys the field names of the table's keys, an empty array if 
the table has a single
+* row, and null if the table is append-only and has no key.
+*/
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  /**
+* Specifies whether the [[Table]] to write is append-only or not.
+*
+* @param isAppendOnly true if the table is append-only, false 
otherwise.
+*/
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only tables " +
--- End diff --

This line is too long (should be less than 100 characters).


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338955
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
+* The method is called after [[TableSink.configure()]].
+*
+* The keys array might be empty, if the table consists of a single 
(updated) record.
+* If the table does not have a key and is append-only, the keys 
attribute is null.
+*
+* @param keys the field names of the table's keys, an empty array if 
the table has a single
+* row, and null if the table is append-only and has no key.
+*/
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  /**
+* Specifies whether the [[Table]] to write is append-only or not.
+*
+* @param isAppendOnly true if the table is append-only, false 
otherwise.
+*/
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only tables " +
--- End diff --

This line is too long (should be less than 100 characters).


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338947
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
+* The method is called after [[TableSink.configure()]].
+*
+* The keys array might be empty, if the table consists of a single 
(updated) record.
+* If the table does not have a key and is append-only, the keys 
attribute is null.
+*
+* @param keys the field names of the table's keys, an empty array if 
the table has a single
+* row, and null if the table is append-only and has no key.
+*/
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  /**
+* Specifies whether the [[Table]] to write is append-only or not.
+*
+* @param isAppendOnly true if the table is append-only, false 
otherwise.
+*/
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only tables " +
+"as the table would grow infinitely")
+}
+  }
+
+  /** Returns the requested record type */
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  /** Emits the DataStream. */
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
--- End diff --

This `process(processFunction)` method has been deprecated. Replace it with 
`process(KeyedProcessFunction)`.


---


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398000#comment-16398000
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174338947
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
+* The method is called after [[TableSink.configure()]].
+*
+* The keys array might be empty, if the table consists of a single 
(updated) record.
+* If the table does not have a key and is append-only, the keys 
attribute is null.
+*
+* @param keys the field names of the table's keys, an empty array if 
the table has a single
+* row, and null if the table is append-only and has no key.
+*/
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  /**
+* Specifies whether the [[Table]] to write is append-only or not.
+*
+* @param isAppendOnly true if the table is append-only, false 
otherwise.
+*/
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only tables " +
+"as the table would grow infinitely")
+}
+  }
+
+  /** Returns the requested record type */
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  /** Emits the DataStream. */
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
--- End diff --

This `process(processFunction)` method has been deprecated. Replace it with 
`process(KeyedProcessFunction)`.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jir

[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397998#comment-16397998
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r174338580
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
 ---
@@ -105,6 +105,8 @@ public T createAndRestore(@Nonnull List> restoreOptions)
 
++alternativeIdx;
 
+   // IMPORTANT: please be careful when modifying the log 
statements because they are used for validation in
+   // the automatic end-to-end tests. Those tests might 
fail if they are not aligned with the log message!
if (restoreState.isEmpty()) {
LOG.debug("Creating {} with empty state.", 
logDescription);
} else {
--- End diff --

nit: since this log's format is so special, maybe we can introduce a 
`static final Sting LOG_FORMAT = "Creating {} with empty state."` for it, and 
guard it by adding a unit test (feel free to ignore).


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Test
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-13 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r174338580
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
 ---
@@ -105,6 +105,8 @@ public T createAndRestore(@Nonnull List> restoreOptions)
 
++alternativeIdx;
 
+   // IMPORTANT: please be careful when modifying the log 
statements because they are used for validation in
+   // the automatic end-to-end tests. Those tests might 
fail if they are not aligned with the log message!
if (restoreState.isEmpty()) {
LOG.debug("Creating {} with empty state.", 
logDescription);
} else {
--- End diff --

nit: since this log's format is so special, maybe we can introduce a 
`static final Sting LOG_FORMAT = "Creating {} with empty state."` for it, and 
guard it by adding a unit test (feel free to ignore).


---


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397984#comment-16397984
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5688
  
@twalthr Please help to review this.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8888) Upgrade AWS SDK in flink-connector-kinesis

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397985#comment-16397985
 ] 

ASF GitHub Bot commented on FLINK-:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5663
  
@StephanEwen we are looking to utilize this patch to override discovery 
using ListShards (initially in our connector extension). 

I have tested the SDK version change on top of our Flink 1.4 + patches 
branch with my kinesis quick check app and it works as expected.

I believe @kailashhd had already addressed all other concerns?





> Upgrade AWS SDK in flink-connector-kinesis
> --
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Bump up the java aws sdk version to 1.11.272. Evaluate also the impact of 
> this version upgrade for KCL and KPL versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5688: [FLINK-6968][Table API & SQL] Add Queryable table sink.

2018-03-13 Thread liurenjie1024
Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5688
  
@twalthr Please help to review this.


---


[GitHub] flink issue #5663: [FLINK-8888] [Kinesis Connectors] Update the AWS SDK for ...

2018-03-13 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5663
  
@StephanEwen we are looking to utilize this patch to override discovery 
using ListShards (initially in our connector extension). 

I have tested the SDK version change on top of our Flink 1.4 + patches 
branch with my kinesis quick check app and it works as expected.

I believe @kailashhd had already addressed all other concerns?





---


[jira] [Commented] (FLINK-6075) ORDER BY *time ASC

2018-03-13 Thread yinhua.dai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397950#comment-16397950
 ] 

yinhua.dai commented on FLINK-6075:
---

[~fhueske]

Thanks for the explanation.

So "ORDER BY" always result in a "retract" stream, right?

What I want is an "append" output stream that could sort by some other fileds 
among all elements received during a thumble window, is that possible to 
express with SQL?

> ORDER BY *time ASC
> --
>
> Key: FLINK-6075
> URL: https://issues.apache.org/jira/browse/FLINK-6075
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: radu
>Priority: Major
>  Labels: features
> Attachments: sort.png
>
>
> These will be split in 3 separated JIRA issues. However, the design is the 
> same only the processing function differs in terms of the output. Hence, the 
> design is the same for all of them.
> Time target: Proc Time
> **SQL targeted query examples:**
> *Sort example*
> Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL 
> '3' HOUR) ORDER BY b` 
> Comment: window is defined using GROUP BY
> Comment: ASC or DESC keywords can be placed to mark the ordering type
> *Limit example*
> Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL 
> '1' HOUR AND current_timestamp ORDER BY b LIMIT 10`
> Comment: window is defined using time ranges in the WHERE clause
> Comment: window is row triggered
> *Top example*
> Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING 
> LIMIT 10) FROM stream1`  
> Comment: limit over the contents of the sliding window
> General Comments:
> -All these SQL clauses are supported only over windows (bounded collections 
> of data). 
> -Each of the 3 operators will be supported with each of the types of 
> expressing the windows. 
> **Description**
> The 3 operations (limit, top and sort) are similar in behavior as they all 
> require a sorted collection of the data on which the logic will be applied 
> (i.e., select a subset of the items or the entire sorted set). These 
> functions would make sense in the streaming context only in the context of a 
> window. Without defining a window the functions could never emit as the sort 
> operation would never trigger. If an SQL query will be provided without 
> limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). 
> Although not targeted by this JIRA, in the case of working based on event 
> time order, the retraction mechanisms of windows and the lateness mechanisms 
> can be used to deal with out of order events and retraction/updates of 
> results.
> **Functionality example**
> We exemplify with the query below for all the 3 types of operators (sorting, 
> limit and top). Rowtime indicates when the HOP window will trigger – which 
> can be observed in the fact that outputs are generated only at those moments. 
> The HOP windows will trigger at every hour (fixed hour) and each event will 
> contribute/ be duplicated for 2 consecutive hour intervals. Proctime 
> indicates the processing time when a new event arrives in the system. Events 
> are of the type (a,b) with the ordering being applied on the b field.
> `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR) 
> ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `)
> ||Rowtime||   Proctime||  Stream1||   Limit 2||   Top 2|| Sort 
> [ASC]||
> | |10:00:00  |(aaa, 11)   |   | | 
>|
> | |10:05:00|(aab, 7)  |   | ||
> |10-11  |11:00:00  |  |   aab,aaa |aab,aaa  | aab,aaa 
>|
> | |11:03:00  |(aac,21)  |   | ||  
> 
> |11-12|12:00:00  |  | aab,aaa |aab,aaa  | aab,aaa,aac|
> | |12:10:00  |(abb,12)  |   | ||  
> 
> | |12:15:00  |(abb,12)  |   | ||  
> 
> |12-13  |13:00:00  |  |   abb,abb | abb,abb | 
> abb,abb,aac|
> |...|
> **Implementation option**
> Considering that the SQL operators will be associated with window boundaries, 
> the functionality will be implemented within the logic of the window as 
> follows.
> * Window assigner – selected based on the type of window used in SQL 
> (TUMBLING, SLIDING…)
> * Evictor/ Trigger – time or count evictor based on the definition of the 
> window boundaries
> * Apply – window function that sorts data and selects the output to trigger 
> (based on LIMIT/TOP parameters). All data will be sorted at once and result 
> outputted when the window is triggered
> An alternative implementation can be to use a fold window function to sort 
> the elemen

[jira] [Commented] (FLINK-6810) Add Some built-in Scalar Function supported

2018-03-13 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397874#comment-16397874
 ] 

Fabian Hueske commented on FLINK-6810:
--

Hi [~fdiazgon], sure :).

I suggest to pick one that does not have an open PR and no dependencies. You 
can have a look at the resolved issues to get an idea of what changes are 
necessary.

Best, Fabian

> Add Some built-in Scalar Function supported
> ---
>
> Key: FLINK-6810
> URL: https://issues.apache.org/jira/browse/FLINK-6810
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: starter
>
> In this JIRA, will create some sub-task for add specific scalar function, 
> such as mathematical-function {{LOG}}, date-functions
>  {{DATEADD}},string-functions {{LPAD}}, etc. 
> I think is good way to let SQL work, and then add TableAPI to supported. So I 
> suggest one scalar function create two sub-task, one is for SQL. another for 
> TableAPI.
> *Note:*
> Every scalar function should add TableAPI doc in  
> {{./docs/dev/table/tableApi.md#built-in-functions}}. 
> Add SQL doc in {{./docs/dev/table/sql.md#built-in-functions}}.
> Welcome anybody to add the sub-task about standard database scalar function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8919) Add KeyedProcessFunctionWIthCleanupState

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397869#comment-16397869
 ] 

ASF GitHub Bot commented on FLINK-8919:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5680
  
Taking a step back. What's the purpose of this change? It adds a class to 
the `flink-table` module that is not used anywhere and not part of the public 
API of the Table API or SQL.
Why do we need it?


> Add KeyedProcessFunctionWIthCleanupState
> 
>
> Key: FLINK-8919
> URL: https://issues.apache.org/jira/browse/FLINK-8919
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Minor
> Fix For: 1.6.0
>
>
> ProcessFunctionWithCleanupState is a useful tool and I think we also need one 
> for the new KeyedProcessFunction api.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

2018-03-13 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5680
  
Taking a step back. What's the purpose of this change? It adds a class to 
the `flink-table` module that is not used anywhere and not part of the public 
API of the Table API or SQL.
Why do we need it?


---


[jira] [Commented] (FLINK-6075) ORDER BY *time ASC

2018-03-13 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397849#comment-16397849
 ] 

Fabian Hueske commented on FLINK-6075:
--

Hi [~yinhua], that is not a limitation of Calcite. In principle, you can sort 
the result of a query on any (combination of) attributes.

However, it is not possible to efficiently execute such a query in streaming 
mode. Ascending time order is possible because records arrive (almost) in that 
order. Another order is not possible because that would require to persist the 
output of a query with unbounded input which is obviously not possible. 
"Sorting in a time window" cannot be expressed in SQL and would btw. still mean 
that the records are somehow ordered by time.

> ORDER BY *time ASC
> --
>
> Key: FLINK-6075
> URL: https://issues.apache.org/jira/browse/FLINK-6075
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: radu
>Priority: Major
>  Labels: features
> Attachments: sort.png
>
>
> These will be split in 3 separated JIRA issues. However, the design is the 
> same only the processing function differs in terms of the output. Hence, the 
> design is the same for all of them.
> Time target: Proc Time
> **SQL targeted query examples:**
> *Sort example*
> Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL 
> '3' HOUR) ORDER BY b` 
> Comment: window is defined using GROUP BY
> Comment: ASC or DESC keywords can be placed to mark the ordering type
> *Limit example*
> Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL 
> '1' HOUR AND current_timestamp ORDER BY b LIMIT 10`
> Comment: window is defined using time ranges in the WHERE clause
> Comment: window is row triggered
> *Top example*
> Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING 
> LIMIT 10) FROM stream1`  
> Comment: limit over the contents of the sliding window
> General Comments:
> -All these SQL clauses are supported only over windows (bounded collections 
> of data). 
> -Each of the 3 operators will be supported with each of the types of 
> expressing the windows. 
> **Description**
> The 3 operations (limit, top and sort) are similar in behavior as they all 
> require a sorted collection of the data on which the logic will be applied 
> (i.e., select a subset of the items or the entire sorted set). These 
> functions would make sense in the streaming context only in the context of a 
> window. Without defining a window the functions could never emit as the sort 
> operation would never trigger. If an SQL query will be provided without 
> limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). 
> Although not targeted by this JIRA, in the case of working based on event 
> time order, the retraction mechanisms of windows and the lateness mechanisms 
> can be used to deal with out of order events and retraction/updates of 
> results.
> **Functionality example**
> We exemplify with the query below for all the 3 types of operators (sorting, 
> limit and top). Rowtime indicates when the HOP window will trigger – which 
> can be observed in the fact that outputs are generated only at those moments. 
> The HOP windows will trigger at every hour (fixed hour) and each event will 
> contribute/ be duplicated for 2 consecutive hour intervals. Proctime 
> indicates the processing time when a new event arrives in the system. Events 
> are of the type (a,b) with the ordering being applied on the b field.
> `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR) 
> ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `)
> ||Rowtime||   Proctime||  Stream1||   Limit 2||   Top 2|| Sort 
> [ASC]||
> | |10:00:00  |(aaa, 11)   |   | | 
>|
> | |10:05:00|(aab, 7)  |   | ||
> |10-11  |11:00:00  |  |   aab,aaa |aab,aaa  | aab,aaa 
>|
> | |11:03:00  |(aac,21)  |   | ||  
> 
> |11-12|12:00:00  |  | aab,aaa |aab,aaa  | aab,aaa,aac|
> | |12:10:00  |(abb,12)  |   | ||  
> 
> | |12:15:00  |(abb,12)  |   | ||  
> 
> |12-13  |13:00:00  |  |   abb,abb | abb,abb | 
> abb,abb,aac|
> |...|
> **Implementation option**
> Considering that the SQL operators will be associated with window boundaries, 
> the functionality will be implemented within the logic of the window as 
> follows.
> * Window assigner – selected based on the type of window used in SQL 
> (TUMBLING, SLIDING…)
> * Evictor/ Trigger – time or count evictor based on the definition of the

[jira] [Commented] (FLINK-8862) Support HBase snapshot read

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397830#comment-16397830
 ] 

ASF GitHub Bot commented on FLINK-8862:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5639
  
Thanks for the PR @neoremind.
At the moment, the community is busy working on the 1.5 release which means 
that PRs for 1.5 fixes have priority right now. Also a large contribution such 
as this one takes a lot of time to review. Unfortunately, I won't be able to 
review the PR in the near future. Best, Fabian


> Support HBase snapshot read
> ---
>
> Key: FLINK-8862
> URL: https://issues.apache.org/jira/browse/FLINK-8862
> Project: Flink
>  Issue Type: Improvement
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.0
>Reporter: Xu Zhang
>Priority: Major
> Attachments: FLINK-8862-Design-Class-Diagram.png, 
> FLINK-8862-DesignDoc.pdf
>
>
> Flink-hbase connector only supports reading/scanning HBase over region server 
> scanner, there is also snapshot scanning solution, just like Hadoop provides 
> 2 ways to scan HBase, one is TableInputFormat, the other is 
> TableSnapshotInputFormat, so it would be great if flink supports both 
> solutions to ensure more wider usage scope and provide alternatives for users.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5639: [FLINK-8862] [HBase] Support HBase snapshot read

2018-03-13 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5639
  
Thanks for the PR @neoremind.
At the moment, the community is busy working on the 1.5 release which means 
that PRs for 1.5 fixes have priority right now. Also a large contribution such 
as this one takes a lot of time to review. Unfortunately, I won't be able to 
review the PR in the near future. Best, Fabian


---


[jira] [Commented] (FLINK-6810) Add Some built-in Scalar Function supported

2018-03-13 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-6810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397512#comment-16397512
 ] 

Fernando Díaz commented on FLINK-6810:
--

Hi. I would like to work on any of these. They seem OK for a newcomer ;)

> Add Some built-in Scalar Function supported
> ---
>
> Key: FLINK-6810
> URL: https://issues.apache.org/jira/browse/FLINK-6810
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: starter
>
> In this JIRA, will create some sub-task for add specific scalar function, 
> such as mathematical-function {{LOG}}, date-functions
>  {{DATEADD}},string-functions {{LPAD}}, etc. 
> I think is good way to let SQL work, and then add TableAPI to supported. So I 
> suggest one scalar function create two sub-task, one is for SQL. another for 
> TableAPI.
> *Note:*
> Every scalar function should add TableAPI doc in  
> {{./docs/dev/table/tableApi.md#built-in-functions}}. 
> Add SQL doc in {{./docs/dev/table/sql.md#built-in-functions}}.
> Welcome anybody to add the sub-task about standard database scalar function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5663: [FLINK-8888] [Kinesis Connectors] Update the AWS SDK for ...

2018-03-13 Thread kailashhd
Github user kailashhd commented on the issue:

https://github.com/apache/flink/pull/5663
  
Sorry for the delay in this. Confirming that I tested this using kinesalite 
and kinesis for both the consumer and producer functionality. I had some 
trouble when connecting this to kinesalite due to the following issue for 
consumers [FLINK-8936]. I am planning on fixing this in the next iteration and 
did not want for cleaner modular PRs. 


---


[jira] [Commented] (FLINK-8888) Upgrade AWS SDK in flink-connector-kinesis

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397485#comment-16397485
 ] 

ASF GitHub Bot commented on FLINK-:
---

Github user kailashhd commented on the issue:

https://github.com/apache/flink/pull/5663
  
Sorry for the delay in this. Confirming that I tested this using kinesalite 
and kinesis for both the consumer and producer functionality. I had some 
trouble when connecting this to kinesalite due to the following issue for 
consumers [FLINK-8936]. I am planning on fixing this in the next iteration and 
did not want for cleaner modular PRs. 


> Upgrade AWS SDK in flink-connector-kinesis
> --
>
> Key: FLINK-
> URL: https://issues.apache.org/jira/browse/FLINK-
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Bump up the java aws sdk version to 1.11.272. Evaluate also the impact of 
> this version upgrade for KCL and KPL versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397394#comment-16397394
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174231064
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
--- End diff --

replace console output with logging, you can refer to 
`RocksDBListStatePerformanceTest.java`


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>   

[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397393#comment-16397393
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174231142
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+
+   // write batch with d

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-13 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174231064
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
--- End diff --

replace console output with logging, you can refer to 
`RocksDBListStatePerformanceTest.java`


---


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397392#comment-16397392
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174231173
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+
+   // write batch with d

[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397395#comment-16397395
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174230739
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
--- End diff --

Need to move the benchmark test to 
`org.apache.flink.contrib.streaming.state.benchmark` package.


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-13 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174230739
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
--- End diff --

Need to move the benchmark test to 
`org.apache.flink.contrib.streaming.state.benchmark` package.


---


[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-13 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174231173
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+
+   // write batch with disableWAL=true VS write batch disableWAL = 
true
+   System.out.println("--> write batch with disableWAL=true VS 
write batch disableWAL = true <--");
+   benchMarkHelper(1_000, true, WRITETYPE.WRITE_BATCH);
+   benchMar

[GitHub] flink pull request #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWra...

2018-03-13 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5650#discussion_r174231142
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBWriteBatchWrapperTest.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to guard {@link RocksDBWriteBatchWrapper}.
+ */
+public class RocksDBWriteBatchWrapperTest {
+
+   @Rule
+   public TemporaryFolder folder = new TemporaryFolder();
+
+   @Test
+   public void basicTest() throws Exception {
+
+   List> data = new ArrayList<>(1);
+   for (int i = 0; i < 1; ++i) {
+   data.add(new Tuple2<>(("key:" + i).getBytes(), 
("value:" + i).getBytes()));
+   }
+
+   try (RocksDB db = 
RocksDB.open(folder.newFolder().getAbsolutePath());
+   WriteOptions options = new 
WriteOptions().setDisableWAL(true);
+   ColumnFamilyHandle handle = db.createColumnFamily(new 
ColumnFamilyDescriptor("test".getBytes()));
+   RocksDBWriteBatchWrapper writeBatchWrapper = new 
RocksDBWriteBatchWrapper(db, options, 200)) {
+
+   // insert data
+   for (Tuple2 item : data) {
+   writeBatchWrapper.put(handle, item.f0, item.f1);
+   }
+   writeBatchWrapper.flush();
+
+   // valid result
+   for (Tuple2 item : data) {
+   Assert.assertArrayEquals(item.f1, 
db.get(handle, item.f0));
+   }
+   }
+   }
+
+   @Test
+   @Ignore
+   public void benchMark() throws Exception {
+
+   // put with disableWAL=true VS put with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS put with 
disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.PUT);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.PUT);
+
+   // put with disableWAL=true VS write batch with disableWAL=false
+   System.out.println("--> put with disableWAL=true VS write batch 
with disableWAL=false <--");
+   benchMarkHelper(1_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(10_000, true, WRITETYPE.PUT);
+   benchMarkHelper(10_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(100_000, true, WRITETYPE.PUT);
+   benchMarkHelper(100_000, false, WRITETYPE.WRITE_BATCH);
+
+   benchMarkHelper(1_000_000, true, WRITETYPE.PUT);
+   benchMarkHelper(1_000_000, false, WRITETYPE.WRITE_BATCH);
+
+   // write batch with disableWAL=true VS write batch disableWAL = 
true
+   System.out.println("--> write batch with disableWAL=true VS 
write batch disableWAL = true <--");
+   benchMarkHelper(1_000, true, WRITETYPE.WRITE_BATCH);
+   benchMar

[jira] [Commented] (FLINK-7521) Remove the 10MB limit from the current REST implementation.

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397379#comment-16397379
 ] 

ASF GitHub Bot commented on FLINK-7521:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5685#discussion_r174225988
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Same as {@link 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectDecoder}
+ * but returns HTTP 413 to the client if the payload exceeds {@link 
#maxContentLength}.
+ */
+public class FlinkHttpObjectAggregator extends 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator 
{
+
+   private final Map responseHeaders;
+
+   public FlinkHttpObjectAggregator(final int maxContentLength, final 
Map responseHeaders) {
+   super(maxContentLength);
+   this.responseHeaders = responseHeaders;
--- End diff --

`checkNotNull` or `@NonNull` missing


> Remove the 10MB limit from the current REST implementation.
> ---
>
> Key: FLINK-7521
> URL: https://issues.apache.org/jira/browse/FLINK-7521
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0, 1.5.0, 1.6.0
>Reporter: Kostas Kloudas
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip6
> Fix For: 1.5.0
>
>
> In the current {{AbstractRestServer}} we impose an upper bound of 10MB in the 
> states we can transfer. This is in the line {{.addLast(new 
> HttpObjectAggregator(1024 * 1024 * 10))}} of the server implementation. 
> This limit is restrictive for some of the usecases planned to use this 
> implementation (e.g. the job submission client which has to send full jars, 
> or the queryable state client which may have to receive states bigger than 
> that).
> This issue proposes the elimination of this limit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5685: [FLINK-7521][flip6] Remove the 10MB limit from the...

2018-03-13 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5685#discussion_r174225988
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FlinkHttpObjectAggregator.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Same as {@link 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectDecoder}
+ * but returns HTTP 413 to the client if the payload exceeds {@link 
#maxContentLength}.
+ */
+public class FlinkHttpObjectAggregator extends 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator 
{
+
+   private final Map responseHeaders;
+
+   public FlinkHttpObjectAggregator(final int maxContentLength, final 
Map responseHeaders) {
+   super(maxContentLength);
+   this.responseHeaders = responseHeaders;
--- End diff --

`checkNotNull` or `@NonNull` missing


---


[jira] [Commented] (FLINK-8843) Decouple bind REST address from advertised address

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397324#comment-16397324
 ] 

ASF GitHub Bot commented on FLINK-8843:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5632
  
I'll verify the changes @yanghua 


> Decouple bind REST address from advertised address
> --
>
> Key: FLINK-8843
> URL: https://issues.apache.org/jira/browse/FLINK-8843
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Critical
>  Labels: flip-6
> Fix For: 1.5.0, 1.6.0
>
>
> The {{RestServerEndpoint}} is currently bound to the same address which is 
> also advertised to the client, namely {{RestOptions#REST_ADDRESS}}. It would 
> be better to start the {{RestServerEndpoint}} listening on all address by 
> binding to {{0.0.0.0}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5632: [FLINK-8843][REST] Decouple bind REST address from advert...

2018-03-13 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5632
  
I'll verify the changes @yanghua 


---


[jira] [Assigned] (FLINK-8891) RestServerEndpoint can bind on local address only

2018-03-13 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-8891:
---

Assignee: Gary Yao

> RestServerEndpoint can bind on local address only
> -
>
> Key: FLINK-8891
> URL: https://issues.apache.org/jira/browse/FLINK-8891
> Project: Flink
>  Issue Type: Bug
>  Components: REST, YARN
>Affects Versions: 1.5.0
> Environment: EC2 AMI debian-jessie-amd64-hvm-2017-01-15-1221-ebs 
> (ami-5900cc36)
> Hadoop 2.8.3
> Flink commit 80020cb5866c8bac67a48f89aa481de7de262f83
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> *Description*
> When deploying a Flink session on YARN, the {{DispatcherRestEndpoint}} may 
> incorrectly bind on a local address. When this happens, the job submission 
> and all REST API calls using a non-local address will fail. Setting 
> {{rest.address: 0.0.0.0}} in {{flink-conf.yaml}} has no effect because the 
> value is overridden.
> *znode leader contents*
> {noformat}
> [zk: localhost:2181(CONNECTED) 3] get 
> /flink/application_1520439896153_0001/leader/rest_server_lock
> ??whttp://127.0.1.1:56299srjava.util.UUIDm?/J
>  leastSigBitsJ
>   
> mostSigBitsxp??L???g?M??KFK
> cZxid = 0x1000a
> ctime = Wed Mar 07 16:25:21 UTC 2018
> mZxid = 0x1000a
> mtime = Wed Mar 07 16:25:21 UTC 2018
> pZxid = 0x1000a
> cversion = 0
> dataVersion = 0
> aclVersion = 0
> ephemeralOwner = 0x5620147c122
> dataLength = 106
> numChildren = 0
> {noformat}
> *Contents of {{/etc/hosts}}*
> {noformat}
> 127.0.1.1 ip-172-31-36-187.eu-central-1.compute.internal ip-172-31-36-187
> 127.0.0.1 localhost
> # The following lines are desirable for IPv6 capable hosts
> ::1 ip6-localhost ip6-loopback
> fe00::0 ip6-localnet
> ff00::0 ip6-mcastprefix
> ff02::1 ip6-allnodes
> ff02::2 ip6-allrouters
> ff02::3 ip6-allhosts
> {noformat}
> Note that without the first line, the problem does not appear.
> *Error message & Stacktrace*
> {noformat}
> 2018-03-07 16:25:24,267 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Found 
> application JobManager host name 
> 'ip-172-31-44-106.eu-central-1.compute.internal' and port '56299' from 
> supplied application id 'application_1520439896153_0001'
> Using the parallelism provided by the remote cluster (0). To use another 
> parallelism, set it at the ./bin/flink client.
> Starting execution of program
> STDERR:
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: Could not submit 
> job 6243b830a6cb1a0b6605a15a7d3d81d4.
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:231)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:457)
>   at 
> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:403)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:780)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:274)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:209)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1019)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1095)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1095)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$4(RestClusterClient.java:327)
>   at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>   at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>   at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>   at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>   at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:196)
>   at 
> org.ap

[jira] [Assigned] (FLINK-8843) Decouple bind REST address from advertised address

2018-03-13 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-8843:
---

Assignee: Gary Yao  (was: vinoyang)

> Decouple bind REST address from advertised address
> --
>
> Key: FLINK-8843
> URL: https://issues.apache.org/jira/browse/FLINK-8843
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Critical
>  Labels: flip-6
> Fix For: 1.5.0, 1.6.0
>
>
> The {{RestServerEndpoint}} is currently bound to the same address which is 
> also advertised to the client, namely {{RestOptions#REST_ADDRESS}}. It would 
> be better to start the {{RestServerEndpoint}} listening on all address by 
> binding to {{0.0.0.0}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8934) Cancel slot requests for otherwisely fulfilled requests

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397309#comment-16397309
 ] 

ASF GitHub Bot commented on FLINK-8934:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5687


> Cancel slot requests for otherwisely fulfilled requests
> ---
>
> Key: FLINK-8934
> URL: https://issues.apache.org/jira/browse/FLINK-8934
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: flip-6
> Fix For: 1.5.0, 1.6.0
>
>
> If a slot request is fulfilled with a different allocation id, then we should 
> cancel the other slot request at the {{ResourceManager}}. Otherwise we might 
> have some stale slot requests which first need to time out before slots are 
> available again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8812) Possible resource leak in Flip6

2018-03-13 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-8812:


Assignee: Till Rohrmann

> Possible resource leak in Flip6
> ---
>
> Key: FLINK-8812
> URL: https://issues.apache.org/jira/browse/FLINK-8812
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Reporter: Chesnay Schepler
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In this build (https://travis-ci.org/zentol/flink/builds/347373839) I set the 
> codebase to flip6 for half the profiles to find failing tests.
> The "libraries" job (https://travis-ci.org/zentol/flink/jobs/347373851) 
> failed with an OutOfMemoryError.
> This could mean that there is a memory-leak somewhere.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5687: [FLINK-8934] [flip6] Properly cancel slot requests...

2018-03-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5687


---


[jira] [Closed] (FLINK-8934) Cancel slot requests for otherwisely fulfilled requests

2018-03-13 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8934.

Resolution: Fixed

Fixed via

1.6.0: 445cdfd57941c4c888a6e73356558fd5f11d443c

1.5.0: 9551c6fa91bc1cc12352c9fe8464b5baf868268e

> Cancel slot requests for otherwisely fulfilled requests
> ---
>
> Key: FLINK-8934
> URL: https://issues.apache.org/jira/browse/FLINK-8934
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: flip-6
> Fix For: 1.5.0, 1.6.0
>
>
> If a slot request is fulfilled with a different allocation id, then we should 
> cancel the other slot request at the {{ResourceManager}}. Otherwise we might 
> have some stale slot requests which first need to time out before slots are 
> available again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8934) Cancel slot requests for otherwisely fulfilled requests

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397284#comment-16397284
 ] 

ASF GitHub Bot commented on FLINK-8934:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5687
  
Thanks for the review @zentol. I'll make the naming a bit clearer and then 
merge the PR.


> Cancel slot requests for otherwisely fulfilled requests
> ---
>
> Key: FLINK-8934
> URL: https://issues.apache.org/jira/browse/FLINK-8934
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: flip-6
> Fix For: 1.5.0, 1.6.0
>
>
> If a slot request is fulfilled with a different allocation id, then we should 
> cancel the other slot request at the {{ResourceManager}}. Otherwise we might 
> have some stale slot requests which first need to time out before slots are 
> available again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5687: [FLINK-8934] [flip6] Properly cancel slot requests of oth...

2018-03-13 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5687
  
Thanks for the review @zentol. I'll make the naming a bit clearer and then 
merge the PR.


---


[jira] [Commented] (FLINK-8872) Yarn detached mode via -yd does not detach

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397194#comment-16397194
 ] 

ASF GitHub Bot commented on FLINK-8872:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5672#discussion_r174194372
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java ---
@@ -40,6 +40,7 @@ public Flip6DefaultCLI(Configuration configuration) {
 
@Override
public boolean isActive(CommandLine commandLine) {
+   this.detachedMode = 
commandLine.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt());
--- End diff --

Why not introduce an abstract method `isDetached(CommandLine)` in 
`CustomCommandLine`?


> Yarn detached mode via -yd does not detach
> --
>
> Key: FLINK-8872
> URL: https://issues.apache.org/jira/browse/FLINK-8872
> Project: Flink
>  Issue Type: Bug
>  Components: Client, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Running yarn per-job cluster in detached mode currently does not work and 
> waits for the job to finish.
> Example:
> {code}
> ./bin/flink run -m yarn-cluster -yn 10 -yjm 768 -ytm 3072 -ys 2 -yd -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input
> {code}
> Output in case of an infinite program would then end with something like this:
> {code}
> 2018-03-05 13:41:23,311 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for 
> the cluster to be allocated
> 2018-03-05 13:41:23,313 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying 
> cluster, current state ACCEPTED
> 2018-03-05 13:41:28,342 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - YARN 
> application has been deployed successfully.
> 2018-03-05 13:41:28,343 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The Flink 
> YARN client has been started in detached mode. In order to stop Flink on 
> YARN, use the following command or a YARN web interface to stop it:
> yarn application -kill application_1519984124671_0006
> Please also note that the temporary files of the YARN session in the home 
> directoy will not be removed.
> Starting execution of program
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5672: [FLINK-8872][flip6] fix yarn detached mode command...

2018-03-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5672#discussion_r174194372
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java ---
@@ -40,6 +40,7 @@ public Flip6DefaultCLI(Configuration configuration) {
 
@Override
public boolean isActive(CommandLine commandLine) {
+   this.detachedMode = 
commandLine.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt());
--- End diff --

Why not introduce an abstract method `isDetached(CommandLine)` in 
`CustomCommandLine`?


---


[GitHub] flink issue #5692: [docs] Fix typo in Python API docs

2018-03-13 Thread florianschmidt1994
Github user florianschmidt1994 commented on the issue:

https://github.com/apache/flink/pull/5692
  
@zentol I found this browsing through the docs


---


[GitHub] flink pull request #5692: [docs] Fix typo in Python API docs

2018-03-13 Thread florianschmidt1994
GitHub user florianschmidt1994 opened a pull request:

https://github.com/apache/flink/pull/5692

[docs] Fix typo in Python API docs

Rename `Tockenizer` to `Tokenizer` in python docs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/florianschmidt1994/flink 
fix-typo-in-python-docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5692.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5692


commit d58a919e87fbbdc1f256358f797194cc2ea81f1c
Author: Florian Schmidt 
Date:   2018-03-13T15:54:06Z

[Docs][Typo] Fix typo in Python API docs




---


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-13 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r174165460
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+   env.setStateBackend(new FsStateBackend(checkpointDir, 
asyncCheckpoints));
+   } else if ("rocks".equals(stateBackend)) {
+   boolean incrementalCheckpoints = 
pt.getBoolean("incrementalCheckpoints

[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397111#comment-16397111
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r174165460
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false

[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397110#comment-16397110
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r174175230
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false

[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-13 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r174175230
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,450 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+   env.setStateBackend(new FsStateBackend(checkpointDir, 
asyncCheckpoints));
+   } else if ("rocks".equals(stateBackend)) {
+   boolean incrementalCheckpoints = 
pt.getBoolean("incrementalCheckpoints

[jira] [Commented] (FLINK-8937) Refactor flink end-to-end test skeleton

2018-03-13 Thread Florian Schmidt (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397089#comment-16397089
 ] 

Florian Schmidt commented on FLINK-8937:


[~aljoscha] any opinions on this?

> Refactor flink end-to-end test skeleton
> ---
>
> Key: FLINK-8937
> URL: https://issues.apache.org/jira/browse/FLINK-8937
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>
> I was thinking about refactoring the end-to-end tests skeleton. The following 
> ideas came to mind:
> - Allow each individual test script to specify some setup and cleanup method 
> that gets executed before and after each test, especially also if that test 
> fails and the EXIT trap is triggered
> - Refactor adding a new test script into a function that provides uniform 
> error handling
> - Add helper methods for common tasks like waiting for a pattern to appear in 
> some log file (e.g. to ensure that a task is running)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8937) Refactor flink end-to-end test skeleton

2018-03-13 Thread Florian Schmidt (JIRA)
Florian Schmidt created FLINK-8937:
--

 Summary: Refactor flink end-to-end test skeleton
 Key: FLINK-8937
 URL: https://issues.apache.org/jira/browse/FLINK-8937
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Reporter: Florian Schmidt
Assignee: Florian Schmidt


I was thinking about refactoring the end-to-end tests skeleton. The following 
ideas came to mind:

- Allow each individual test script to specify some setup and cleanup method 
that gets executed before and after each test, especially also if that test 
fails and the EXIT trap is triggered
- Refactor adding a new test script into a function that provides uniform error 
handling
- Add helper methods for common tasks like waiting for a pattern to appear in 
some log file (e.g. to ensure that a task is running)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8936) Provide only AWS region or endpoint in line with the SDK

2018-03-13 Thread Kailash Hassan Dayanand (JIRA)
Kailash Hassan Dayanand created FLINK-8936:
--

 Summary: Provide only AWS region or endpoint in line with the SDK
 Key: FLINK-8936
 URL: https://issues.apache.org/jira/browse/FLINK-8936
 Project: Flink
  Issue Type: Improvement
Reporter: Kailash Hassan Dayanand


Based on recent upgrades to the aws-java-sdk [https://goo.gl/yGLRRG] (in 
1.11.79), it is not possible to have both the regions and endpoint specified in 
the kinesis configuration. The earlier PR which introduced endpoint options 
FLINK-4197 also had comments about conflicting endpoints and regions but was 
okay because java sdk did not enforce. 

Reason for this change: Makes it easier to connect to kinesalite in a 
development environment. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8872) Yarn detached mode via -yd does not detach

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397059#comment-16397059
 ] 

ASF GitHub Bot commented on FLINK-8872:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5672#discussion_r174161228
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java ---
@@ -40,6 +40,7 @@ public Flip6DefaultCLI(Configuration configuration) {
 
@Override
public boolean isActive(CommandLine commandLine) {
+   this.detachedMode = 
commandLine.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt());
--- End diff --

I wonder if there is no better place to initialize this field, instead of 
doing it as undocumented side-effect of something that sounds like an unrelated 
getter?


> Yarn detached mode via -yd does not detach
> --
>
> Key: FLINK-8872
> URL: https://issues.apache.org/jira/browse/FLINK-8872
> Project: Flink
>  Issue Type: Bug
>  Components: Client, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Running yarn per-job cluster in detached mode currently does not work and 
> waits for the job to finish.
> Example:
> {code}
> ./bin/flink run -m yarn-cluster -yn 10 -yjm 768 -ytm 3072 -ys 2 -yd -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input
> {code}
> Output in case of an infinite program would then end with something like this:
> {code}
> 2018-03-05 13:41:23,311 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for 
> the cluster to be allocated
> 2018-03-05 13:41:23,313 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying 
> cluster, current state ACCEPTED
> 2018-03-05 13:41:28,342 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - YARN 
> application has been deployed successfully.
> 2018-03-05 13:41:28,343 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The Flink 
> YARN client has been started in detached mode. In order to stop Flink on 
> YARN, use the following command or a YARN web interface to stop it:
> yarn application -kill application_1519984124671_0006
> Please also note that the temporary files of the YARN session in the home 
> directoy will not be removed.
> Starting execution of program
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5672: [FLINK-8872][flip6] fix yarn detached mode command...

2018-03-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5672#discussion_r174161228
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java ---
@@ -40,6 +40,7 @@ public Flip6DefaultCLI(Configuration configuration) {
 
@Override
public boolean isActive(CommandLine commandLine) {
+   this.detachedMode = 
commandLine.hasOption(CliFrontendParser.DETACHED_OPTION.getOpt());
--- End diff --

I wonder if there is no better place to initialize this field, instead of 
doing it as undocumented side-effect of something that sounds like an unrelated 
getter?


---


[jira] [Commented] (FLINK-8913) RocksDB state backend crashes in alpine image

2018-03-13 Thread Joshua Griffith (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397051#comment-16397051
 ] 

Joshua Griffith commented on FLINK-8913:


It seems like the official Flink docker image shouldn't include Alpine, or at 
least it should include a disclaimer regarding RocksDB. Someone who doesn't 
know about Alpine's use of MUSL instead of glibc might have a hard time 
figuring out what the problem is.

> RocksDB state backend crashes in alpine image
> -
>
> Key: FLINK-8913
> URL: https://issues.apache.org/jira/browse/FLINK-8913
> Project: Flink
>  Issue Type: Bug
>  Components: Docker, State Backends, Checkpointing
>Affects Versions: 1.4.1
> Environment: {{~> minikube version}}
> {{minikube version: v0.25.0}}{{}}{{~> minikube config view}}
> {{- cpus: 4}}
> {{- kubernetes-version: v1.8.0}}
> {{- memory: 8192}}
> {{- vm-driver: hyperkit}}
> {{- WantReportError: true}}
>Reporter: Joshua Griffith
>Priority: Major
>
> Running the word count example jar with the RocksDB state backend enabled 
> (via job manager configuration) with the Flink hadoop28-scala_2.11-alpine 
> image crashes with the following error:
>  
> {{2018-03-09 21:09:12,928 INFO}}{{2018-03-09 21:09:12,892 INFO 
> org.apache.flink.runtime.taskmanager.Task - Source: Collection Source -> Flat 
> Map (1/1) (38365cd6326de6df92b72d1acbda0f72) switched from RUNNING to 
> FINISHED.}}
> {{2018-03-09 21:09:12,892 INFO org.apache.flink.runtime.taskmanager.Task - 
> Freeing task resources for Source: Collection Source -> Flat Map (1/1) 
> (38365cd6326de6df92b72d1acbda0f72).}}
> {{2018-03-09 21:09:12,894 INFO org.apache.flink.runtime.taskmanager.Task - 
> Ensuring all FileSystem streams are closed for task Source: Collection Source 
> -> Flat Map (1/1) (38365cd6326de6df92b72d1acbda0f72) [FINISHED]}}
> {{2018-03-09 21:09:12,897 INFO 
> org.apache.flink.runtime.taskmanager.TaskManager - Un-registering task and 
> sending final execution state FINISHED to JobManager for task Source: 
> Collection Source -> Flat Map (38365cd6326de6df92b72d1acbda0f72)}}
> {{2018-03-09 21:09:12,902 INFO 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Successfully 
> loaded RocksDB native 
> libraryorg.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - 
> Initializing RocksDB keyed state backend from snapshot.}}
> {{#}}
> {{# A fatal error has been detected by the Java Runtime Environment:}}
> {{#}}
> {{# SIGSEGV (0xb) at pc=0x001432b6, pid=1, tid=0x7fc4036e1ae8}}
> {{#}}
> {{# JRE version: OpenJDK Runtime Environment (8.0_151-b12) (build 
> 1.8.0_151-b12)}}
> {{# Java VM: OpenJDK 64-Bit Server VM (25.151-b12 mixed mode linux-amd64 
> compressed oops)}}
> {{# Derivative: IcedTea 3.6.0}}
> {{# Distribution: Custom build (Tue Nov 21 11:22:36 GMT 2017)}}
> {{# Problematic frame:}}
> {{# C 0x001432b6}}
> {{#}}
> {{# Core dump written. Default location: /opt/flink/core or core.1}}
> {{#}}
> {{# An error report file with more information is saved as:}}
> {{# /opt/flink/hs_err_pid1.log}}
> {{#}}
> {{# If you would like to submit a bug report, please include}}
> {{# instructions on how to reproduce the bug and visit:}}
> {{# http://icedtea.classpath.org/bugzilla}}
>  
> Switching to the Debian image fixes this issue. I imagine it has to do with 
> alpine's alternative libc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8802) Concurrent serialization without duplicating serializers in state server.

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397015#comment-16397015
 ] 

ASF GitHub Bot commented on FLINK-8802:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174150658
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture responseFuture = new 
CompletableFuture<>();
 
try {
-   final InternalKvState kvState = 
registry.getKvState(request.getKvStateId());
+   final KvStateEntry kvState = 
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
-   byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+   // here we remove any type check...
+   // Ideally we want to keep that the info match 
the state.
+   final InternalKvState state = 
kvState.getState();
+   final KvStateInfo info = 
kvState.getInfoForCurrentThread();
+
+   byte[] serializedResult = 
state.getSerializedValue(
--- End diff --

True, but after also discussion with Stephan and Aljoscha we concluded that 
probably it is not the right place, just from a "separation-of-concerns" point 
of view.

That said, the more I was working on it, the more I was thinking that the 
`InternalKvStates` could also be a place. But this we can leave for future 
discussion when we re-prioritize QS.


> Concurrent serialization without duplicating serializers in state server.
> -
>
> Key: FLINK-8802
> URL: https://issues.apache.org/jira/browse/FLINK-8802
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The `getSerializedValue()` may be called by multiple threads but serializers 
> are not duplicated, which may lead to exceptions thrown when a serializer is 
> stateful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174150658
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture responseFuture = new 
CompletableFuture<>();
 
try {
-   final InternalKvState kvState = 
registry.getKvState(request.getKvStateId());
+   final KvStateEntry kvState = 
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
-   byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+   // here we remove any type check...
+   // Ideally we want to keep that the info match 
the state.
+   final InternalKvState state = 
kvState.getState();
+   final KvStateInfo info = 
kvState.getInfoForCurrentThread();
+
+   byte[] serializedResult = 
state.getSerializedValue(
--- End diff --

True, but after also discussion with Stephan and Aljoscha we concluded that 
probably it is not the right place, just from a "separation-of-concerns" point 
of view.

That said, the more I was working on it, the more I was thinking that the 
`InternalKvStates` could also be a place. But this we can leave for future 
discussion when we re-prioritize QS.


---


[jira] [Commented] (FLINK-8802) Concurrent serialization without duplicating serializers in state server.

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16397008#comment-16397008
 ] 

ASF GitHub Bot commented on FLINK-8802:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174148331
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture responseFuture = new 
CompletableFuture<>();
 
try {
-   final InternalKvState kvState = 
registry.getKvState(request.getKvStateId());
+   final KvStateEntry kvState = 
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
-   byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+   // here we remove any type check...
+   // Ideally we want to keep that the info match 
the state.
+   final InternalKvState state = 
kvState.getState();
+   final KvStateInfo info = 
kvState.getInfoForCurrentThread();
+
+   byte[] serializedResult = 
state.getSerializedValue(
--- End diff --

well we could move the thread-caching into the `InternalKvStates`.


> Concurrent serialization without duplicating serializers in state server.
> -
>
> Key: FLINK-8802
> URL: https://issues.apache.org/jira/browse/FLINK-8802
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The `getSerializedValue()` may be called by multiple threads but serializers 
> are not duplicated, which may lead to exceptions thrown when a serializer is 
> stateful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174148331
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture responseFuture = new 
CompletableFuture<>();
 
try {
-   final InternalKvState kvState = 
registry.getKvState(request.getKvStateId());
+   final KvStateEntry kvState = 
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
-   byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+   // here we remove any type check...
+   // Ideally we want to keep that the info match 
the state.
+   final InternalKvState state = 
kvState.getState();
+   final KvStateInfo info = 
kvState.getInfoForCurrentThread();
+
+   byte[] serializedResult = 
state.getSerializedValue(
--- End diff --

well we could move the thread-caching into the `InternalKvStates`.


---


[jira] [Commented] (FLINK-8802) Concurrent serialization without duplicating serializers in state server.

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396994#comment-16396994
 ] 

ASF GitHub Bot commented on FLINK-8802:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174145613
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture responseFuture = new 
CompletableFuture<>();
 
try {
-   final InternalKvState kvState = 
registry.getKvState(request.getKvStateId());
+   final KvStateEntry kvState = 
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
-   byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+   // here we remove any type check...
+   // Ideally we want to keep that the info match 
the state.
+   final InternalKvState state = 
kvState.getState();
+   final KvStateInfo info = 
kvState.getInfoForCurrentThread();
+
+   byte[] serializedResult = 
state.getSerializedValue(
--- End diff --

"Word on the street" :P is that `Kryo` duplication is pretty expensive. 
This is why I went for this solution.


> Concurrent serialization without duplicating serializers in state server.
> -
>
> Key: FLINK-8802
> URL: https://issues.apache.org/jira/browse/FLINK-8802
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The `getSerializedValue()` may be called by multiple threads but serializers 
> are not duplicated, which may lead to exceptions thrown when a serializer is 
> stateful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174145613
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture responseFuture = new 
CompletableFuture<>();
 
try {
-   final InternalKvState kvState = 
registry.getKvState(request.getKvStateId());
+   final KvStateEntry kvState = 
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
-   byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+   // here we remove any type check...
+   // Ideally we want to keep that the info match 
the state.
+   final InternalKvState state = 
kvState.getState();
+   final KvStateInfo info = 
kvState.getInfoForCurrentThread();
+
+   byte[] serializedResult = 
state.getSerializedValue(
--- End diff --

"Word on the street" :P is that `Kryo` duplication is pretty expensive. 
This is why I went for this solution.


---


[jira] [Commented] (FLINK-8802) Concurrent serialization without duplicating serializers in state server.

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396988#comment-16396988
 ] 

ASF GitHub Bot commented on FLINK-8802:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174144247
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture responseFuture = new 
CompletableFuture<>();
 
try {
-   final InternalKvState kvState = 
registry.getKvState(request.getKvStateId());
+   final KvStateEntry kvState = 
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
-   byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+   // here we remove any type check...
+   // Ideally we want to keep that the info match 
the state.
+   final InternalKvState state = 
kvState.getState();
+   final KvStateInfo info = 
kvState.getInfoForCurrentThread();
+
+   byte[] serializedResult = 
state.getSerializedValue(
--- End diff --

Ah, i mistakenly thought that `getSerializedValue()` is also called out of 
QS.

What would be the overhead of always duplicating serializers within 
`getSerializedValue()`?


> Concurrent serialization without duplicating serializers in state server.
> -
>
> Key: FLINK-8802
> URL: https://issues.apache.org/jira/browse/FLINK-8802
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The `getSerializedValue()` may be called by multiple threads but serializers 
> are not duplicated, which may lead to exceptions thrown when a serializer is 
> stateful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174144247
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture responseFuture = new 
CompletableFuture<>();
 
try {
-   final InternalKvState kvState = 
registry.getKvState(request.getKvStateId());
+   final KvStateEntry kvState = 
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
-   byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+   // here we remove any type check...
+   // Ideally we want to keep that the info match 
the state.
+   final InternalKvState state = 
kvState.getState();
+   final KvStateInfo info = 
kvState.getInfoForCurrentThread();
+
+   byte[] serializedResult = 
state.getSerializedValue(
--- End diff --

Ah, i mistakenly thought that `getSerializedValue()` is also called out of 
QS.

What would be the overhead of always duplicating serializers within 
`getSerializedValue()`?


---


[jira] [Commented] (FLINK-8906) Flip6DefaultCLI is not tested in org.apache.flink.client.cli tests

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396971#comment-16396971
 ] 

ASF GitHub Bot commented on FLINK-8906:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5671#discussion_r174142014
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendCancelTest.java
 ---
@@ -39,13 +47,27 @@
 /**
  * Tests for the CANCEL command.
  */
+@RunWith(Parameterized.class)
 public class CliFrontendCancelTest extends TestLogger {
 
+   @Parameterized.Parameters(name = "Mode = {0}")
--- End diff --

Everything about introducing this parameter is duplicated in all the test 
classes. I would create a common super class that does this parameter iteration 
and implements the `getCli()` and `getConfiguration()` methods, instead of 
having them as static methods.


> Flip6DefaultCLI is not tested in org.apache.flink.client.cli tests
> --
>
> Key: FLINK-8906
> URL: https://issues.apache.org/jira/browse/FLINK-8906
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0, 1.6.0
>
>
> Various tests in {{org.apache.flink.client.cli}} only test with the 
> {{DefaultCLI}} but should also test {{Flip6DefaultCLI}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5671: [FLINK-8906][flip6][tests] also test Flip6DefaultC...

2018-03-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5671#discussion_r174142014
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendCancelTest.java
 ---
@@ -39,13 +47,27 @@
 /**
  * Tests for the CANCEL command.
  */
+@RunWith(Parameterized.class)
 public class CliFrontendCancelTest extends TestLogger {
 
+   @Parameterized.Parameters(name = "Mode = {0}")
--- End diff --

Everything about introducing this parameter is duplicated in all the test 
classes. I would create a common super class that does this parameter iteration 
and implements the `getCli()` and `getConfiguration()` methods, instead of 
having them as static methods.


---


[jira] [Commented] (FLINK-4569) JobRetrievalITCase.testJobRetrieval() does not forward exceptions to parent thread.

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396968#comment-16396968
 ] 

ASF GitHub Bot commented on FLINK-4569:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5689#discussion_r174140412
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
 ---
@@ -119,6 +121,11 @@ public void run() {
lock.release();
 
resumingThread.join();
+
+   Throwable exception = error.get();
+   if (exception != null) {
+   throw new AssertionError(exception);
+   }
--- End diff --

assertNull throws a NPE. This throws the actual exception with stacktrace 
etc and is the most equivalent to just throwing the exception.


> JobRetrievalITCase.testJobRetrieval() does not forward exceptions to parent 
> thread.
> ---
>
> Key: FLINK-4569
> URL: https://issues.apache.org/jira/browse/FLINK-4569
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> The mentioned test seems to fail frequently, without being detected, because 
> the Assert.fail() is called in a separate thread which doesn't forward 
> exceptions.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/156177995/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5689: [FLINK-4569][tests] Respect exceptions thrown in t...

2018-03-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5689#discussion_r174140412
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
 ---
@@ -119,6 +121,11 @@ public void run() {
lock.release();
 
resumingThread.join();
+
+   Throwable exception = error.get();
+   if (exception != null) {
+   throw new AssertionError(exception);
+   }
--- End diff --

assertNull throws a NPE. This throws the actual exception with stacktrace 
etc and is the most equivalent to just throwing the exception.


---


[jira] [Commented] (FLINK-8906) Flip6DefaultCLI is not tested in org.apache.flink.client.cli tests

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396965#comment-16396965
 ] 

ASF GitHub Bot commented on FLINK-8906:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5671#discussion_r174139638
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -567,7 +567,7 @@ public GetClusterStatusResponse getClusterStatus() {
 
@Override
public int getMaxSlots() {
-   return 0;
+   return -1;
--- End diff --

I suggest to assign this magic number to some `static final` variable with 
a descriptive name and use it here and in `CliFrontend`, so that the connection 
is clear.


> Flip6DefaultCLI is not tested in org.apache.flink.client.cli tests
> --
>
> Key: FLINK-8906
> URL: https://issues.apache.org/jira/browse/FLINK-8906
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0, 1.6.0
>
>
> Various tests in {{org.apache.flink.client.cli}} only test with the 
> {{DefaultCLI}} but should also test {{Flip6DefaultCLI}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5671: [FLINK-8906][flip6][tests] also test Flip6DefaultC...

2018-03-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5671#discussion_r174139638
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -567,7 +567,7 @@ public GetClusterStatusResponse getClusterStatus() {
 
@Override
public int getMaxSlots() {
-   return 0;
+   return -1;
--- End diff --

I suggest to assign this magic number to some `static final` variable with 
a descriptive name and use it here and in `CliFrontend`, so that the connection 
is clear.


---


[jira] [Commented] (FLINK-4569) JobRetrievalITCase.testJobRetrieval() does not forward exceptions to parent thread.

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396953#comment-16396953
 ] 

ASF GitHub Bot commented on FLINK-4569:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5689#discussion_r174135885
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
 ---
@@ -119,6 +121,11 @@ public void run() {
lock.release();
 
resumingThread.join();
+
+   Throwable exception = error.get();
+   if (exception != null) {
+   throw new AssertionError(exception);
+   }
--- End diff --

Is this any different from using `Assert.assertNull(error.get())`? Not that 
it makes any difference but just to know why you picked throwing the exception 
here. Personally I think that the assertNull seems more explicit when you read 
the code and it is less verbose.


> JobRetrievalITCase.testJobRetrieval() does not forward exceptions to parent 
> thread.
> ---
>
> Key: FLINK-4569
> URL: https://issues.apache.org/jira/browse/FLINK-4569
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> The mentioned test seems to fail frequently, without being detected, because 
> the Assert.fail() is called in a separate thread which doesn't forward 
> exceptions.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/156177995/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5689: [FLINK-4569][tests] Respect exceptions thrown in t...

2018-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5689#discussion_r174135885
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
 ---
@@ -119,6 +121,11 @@ public void run() {
lock.release();
 
resumingThread.join();
+
+   Throwable exception = error.get();
+   if (exception != null) {
+   throw new AssertionError(exception);
+   }
--- End diff --

Is this any different from using `Assert.assertNull(error.get())`? Not that 
it makes any difference but just to know why you picked throwing the exception 
here. Personally I think that the assertNull seems more explicit when you read 
the code and it is less verbose.


---


[jira] [Commented] (FLINK-8802) Concurrent serialization without duplicating serializers in state server.

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396949#comment-16396949
 ] 

ASF GitHub Bot commented on FLINK-8802:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174133415
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture responseFuture = new 
CompletableFuture<>();
 
try {
-   final InternalKvState kvState = 
registry.getKvState(request.getKvStateId());
+   final KvStateEntry kvState = 
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
-   byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+   // here we remove any type check...
+   // Ideally we want to keep that the info match 
the state.
+   final InternalKvState state = 
kvState.getState();
+   final KvStateInfo info = 
kvState.getInfoForCurrentThread();
+
+   byte[] serializedResult = 
state.getSerializedValue(
--- End diff --

It is not an assumption. It is accessed concurrently from the thread pool. 
Synchronizing on the `kvState` would serialize all accesses, which would slow 
down (by a lot) performance.


> Concurrent serialization without duplicating serializers in state server.
> -
>
> Key: FLINK-8802
> URL: https://issues.apache.org/jira/browse/FLINK-8802
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The `getSerializedValue()` may be called by multiple threads but serializers 
> are not duplicated, which may lead to exceptions thrown when a serializer is 
> stateful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-13 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174133415
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture responseFuture = new 
CompletableFuture<>();
 
try {
-   final InternalKvState kvState = 
registry.getKvState(request.getKvStateId());
+   final KvStateEntry kvState = 
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
-   byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+   // here we remove any type check...
+   // Ideally we want to keep that the info match 
the state.
+   final InternalKvState state = 
kvState.getState();
+   final KvStateInfo info = 
kvState.getInfoForCurrentThread();
+
+   byte[] serializedResult = 
state.getSerializedValue(
--- End diff --

It is not an assumption. It is accessed concurrently from the thread pool. 
Synchronizing on the `kvState` would serialize all accesses, which would slow 
down (by a lot) performance.


---


[jira] [Commented] (FLINK-8802) Concurrent serialization without duplicating serializers in state server.

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396943#comment-16396943
 ] 

ASF GitHub Bot commented on FLINK-8802:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174128377
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture responseFuture = new 
CompletableFuture<>();
 
try {
-   final InternalKvState kvState = 
registry.getKvState(request.getKvStateId());
+   final KvStateEntry kvState = 
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
-   byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+   // here we remove any type check...
+   // Ideally we want to keep that the info match 
the state.
+   final InternalKvState state = 
kvState.getState();
+   final KvStateInfo info = 
kvState.getInfoForCurrentThread();
+
+   byte[] serializedResult = 
state.getSerializedValue(
--- End diff --

Couldn't we synchronize on `kvState` instead of modifying all 
`InternalKvState` implementations?

This seems like a much safer alternative than baking in the assumption that 
`getSerializedValue()` can be called concurrently.



> Concurrent serialization without duplicating serializers in state server.
> -
>
> Key: FLINK-8802
> URL: https://issues.apache.org/jira/browse/FLINK-8802
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The `getSerializedValue()` may be called by multiple threads but serializers 
> are not duplicated, which may lead to exceptions thrown when a serializer is 
> stateful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5691: [FLINK-8802] [QS] Fix concurrent access to non-dup...

2018-03-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5691#discussion_r174128377
  
--- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
 ---
@@ -78,13 +75,22 @@ public KvStateServerHandler(
final CompletableFuture responseFuture = new 
CompletableFuture<>();
 
try {
-   final InternalKvState kvState = 
registry.getKvState(request.getKvStateId());
+   final KvStateEntry kvState = 
registry.getKvState(request.getKvStateId());
if (kvState == null) {
responseFuture.completeExceptionally(new 
UnknownKvStateIdException(getServerName(), request.getKvStateId()));
} else {
byte[] serializedKeyAndNamespace = 
request.getSerializedKeyAndNamespace();
 
-   byte[] serializedResult = 
kvState.getSerializedValue(serializedKeyAndNamespace);
+   // here we remove any type check...
+   // Ideally we want to keep that the info match 
the state.
+   final InternalKvState state = 
kvState.getState();
+   final KvStateInfo info = 
kvState.getInfoForCurrentThread();
+
+   byte[] serializedResult = 
state.getSerializedValue(
--- End diff --

Couldn't we synchronize on `kvState` instead of modifying all 
`InternalKvState` implementations?

This seems like a much safer alternative than baking in the assumption that 
`getSerializedValue()` can be called concurrently.



---


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396934#comment-16396934
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5676
  
CC @tillrohrmann 


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Test
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5676: [FLINK-8910][tests] Automated end-to-end test for local r...

2018-03-13 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5676
  
CC @tillrohrmann 


---


[jira] [Commented] (FLINK-8919) Add KeyedProcessFunctionWIthCleanupState

2018-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16396927#comment-16396927
 ] 

ASF GitHub Bot commented on FLINK-8919:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5680
  
The purpose of unit tests is not only to validate that the new feature 
works as expected but also to ensure that the functionality is not broken by 
future changes.
So even if the code is copied, we should add a test.


> Add KeyedProcessFunctionWIthCleanupState
> 
>
> Key: FLINK-8919
> URL: https://issues.apache.org/jira/browse/FLINK-8919
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Minor
> Fix For: 1.6.0
>
>
> ProcessFunctionWithCleanupState is a useful tool and I think we also need one 
> for the new KeyedProcessFunction api.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >