[GitHub] flink issue #6357: [FLINK-9720] [Scheduler] Add ResourceAttribute and MesosR...

2018-07-20 Thread liurenjie1024
Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/6357
  
@tillrohrmann Can you help to review this?


---


[GitHub] flink pull request #6357: [FLINK-9720] [Scheduler] Add ResourceAttribute and...

2018-07-17 Thread liurenjie1024
GitHub user liurenjie1024 opened a pull request:

https://github.com/apache/flink/pull/6357

[FLINK-9720] [Scheduler] Add ResourceAttribute and MesosResourceAttribute.

## What is the purpose of the change

*This pull request is one of PRs for adding tag support to flink scheduler. 
*


## Brief change log

  - *Add ResourceAttribute interface.*
  - *Add MesosResourceAttribute class.*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? no

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/liurenjie1024/flink FLINK-9720

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6357.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6357


commit 80d80a719b5825d59bfcd07a2333e78e33204923
Author: liurenjie1024 
Date:   2018-07-16T15:09:32Z

Add ResourceAttribute and MesosResourceAttribute.




---


[GitHub] flink issue #6214: [FLINK-9669] Add assignment store interface.

2018-07-01 Thread liurenjie1024
Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/6214
  
@tillrohrmann This is from my initial design, and since the design has 
changed, we can close this now.


---


[GitHub] flink pull request #6214: [FLINK-9669] Add assignment store interface.

2018-07-01 Thread liurenjie1024
Github user liurenjie1024 closed the pull request at:

https://github.com/apache/flink/pull/6214


---


[GitHub] flink pull request #6214: [FLINK-9669] Add assignment store interface.

2018-06-27 Thread liurenjie1024
GitHub user liurenjie1024 opened a pull request:

https://github.com/apache/flink/pull/6214

[FLINK-9669] Add assignment store interface.

## What is the purpose of the change

This pull requests is part of process to enable task manager isolation in 
flink 1.5 session mode.


## Brief change log
  - Add task manager assignment class.
  - Add task manager assignment store interface.


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/liurenjie1024/flink FLINK-9669

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6214.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6214


commit 2b6156459e7ad45cfd33147cfa27a6c7386dd4c4
Author: liurenjie1024 
Date:   2018-06-27T09:08:11Z

Add assignment store interface.




---


[GitHub] flink issue #5688: [FLINK-6968][Table API & SQL] Add Queryable table sink.

2018-05-25 Thread liurenjie1024
Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5688
  
@twalthr  I store each column separately to avoid encoding and schema 
management problems, also to reduce unnecessary data transfer when querying 
data. Issue multiple requests for multiple fields is one of the drawbacks.


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-05-25 Thread liurenjie1024
Github user liurenjie1024 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190859490
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala
 ---
@@ -44,8 +45,20 @@ abstract class KeyedProcessFunctionWithCleanupState[K, 
I, O](queryConfig: Stream
   protected def registerProcessingCleanupTimer(
 ctx: KeyedProcessFunction[K, I, O]#Context,
 currentTime: Long): Unit = {
-if (stateCleaningEnabled) {
+registerCleanupTimer(ctx, currentTime, TimeDomain.PROCESSING_TIME)
+  }
 
+  protected def registerEventCleanupTimer(
--- End diff --

The reason why I put it in the same PR is that I don't want it to block 
this PR, but I also agree that we should move it to a separate one.


---


[GitHub] flink issue #5688: [FLINK-6968][Table API & SQL] Add Queryable table sink.

2018-04-23 Thread liurenjie1024
Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5688
  
@suez1224 Conflict resolved.


---


[GitHub] flink issue #5704: [FLINK-8852] [sql-client] Add FLIP-6 support to SQL Clien...

2018-03-24 Thread liurenjie1024
Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5704
  
LGTM


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-20 Thread liurenjie1024
Github user liurenjie1024 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r175680499
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
+  }
+
+  override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = {
+new QueryableTableSink(this.namePrefix, this.queryConfig)
+  }
+}
+
+class RowKeySelector(
+  private val keyIndices: Array[Int],
+  @transient private val returnType: TypeInformation[Row])
+  extends KeySelector[JTuple2[JBool, Row], Row]
+with ResultTypeQueryable[Row] {
+
+  override def getKey(value: JTuple2[JBool, Row]): Row = {
+val keys = keyIndices
+
+val srcRow = value.f1
+
+val destRow = new Row(keys.length)
+var i = 0
+while (i < keys.length) {
+  destRow.setField(i, srcRow.getField(keys(i)))
+  i += 1
+}
+
+destRow
+  }
+
+  override def getProducedType: TypeInformation[Row] = returnType
+}
+
+class QueryableStateProcessFunction(
+  private val namePrefix: String,
+  private val queryConfig: StreamQueryConfig,
+  private val keyNames: Array[String],
+  private val fieldNames: Array[String],
+  private val fieldTypes: Array[TypeInformation[_]])
+  extends ProcessFunctionWithCleanupState[JTuple2[JBool, Row], 
Void](queryConfig) {
+
+  @transient private var states = Array[ValueState[AnyRef]]()
+  @transient private var nonKeyIndices = Array[Int]()
+
+  override def op

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-20 Thread liurenjie1024
Github user liurenjie1024 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r175680339
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(
--- End diff --

Java doc added.


---


[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

2018-03-16 Thread liurenjie1024
Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5680
  
Can anyone help to merge this?


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-15 Thread liurenjie1024
Github user liurenjie1024 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174990348
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(
+  private val namePrefix: String,
--- End diff --

Fixed.


---


[GitHub] flink issue #5688: [FLINK-6968][Table API & SQL] Add Queryable table sink.

2018-03-14 Thread liurenjie1024
Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5688
  
@xccui Thanks for you suggestions.

1. Add an it test for this is difficult. Think about it, I need to ensure 
that elements to be processed while keeping the job running, and it's difficult 
to achieve this. But I will do some manual test for that.
2. I'll add doc for that.
3. I'll squash the commits when review is done.


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-14 Thread liurenjie1024
Github user liurenjie1024 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174366980
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.table.sinks.{QueryableStateProcessFunction, 
RowKeySelector}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class QueryableTableSinkTest extends HarnessTestBase {
+  @Test
+  def testRowSelector(): Unit = {
+val keyTypes = Array(TypeInformation.of(classOf[List[Int]]), 
TypeInformation.of(classOf[String]))
+val selector = new RowKeySelector(Array(0, 2), new 
RowTypeInfo(keyTypes:_*))
+
+val src = Row.of(List(1), "a", "b")
+val key = selector.getKey(JTuple2.of(true, src))
+
+assertEquals(Row.of(List(1), "b"), key)
+  }
+
+  @Test
+  def testProcessFunction(): Unit = {
+val queryConfig = new StreamQueryConfig()
+  .withIdleStateRetentionTime(Time.milliseconds(2), 
Time.milliseconds(10))
+
+val keys = Array("id")
+val keyType = new RowTypeInfo(TypeInformation.of(classOf[String]))
+val fieldNames = Array("id", "is_manager", "name")
+val fieldTypes: Array[TypeInformation[_]] = Array(
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]])
+val func = new QueryableStateProcessFunction("test", queryConfig, 
keys, fieldNames, fieldTypes)
+
+val operator = new LegacyKeyedProcessOperator[Row, JTuple2[JBool, 
Row], Void](func)
--- End diff --

Same as above.


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-14 Thread liurenjie1024
Github user liurenjie1024 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r174366242
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.ProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
+
+class QueryableTableSink(private val namePrefix: String,
+ private val queryConfig: StreamQueryConfig)
+  extends UpsertStreamTableSink[Row]
+  with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  /**
+* Configures the unique key fields of the [[Table]] to write.
+* The method is called after [[TableSink.configure()]].
+*
+* The keys array might be empty, if the table consists of a single 
(updated) record.
+* If the table does not have a key and is append-only, the keys 
attribute is null.
+*
+* @param keys the field names of the table's keys, an empty array if 
the table has a single
+* row, and null if the table is append-only and has no key.
+*/
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  /**
+* Specifies whether the [[Table]] to write is append-only or not.
+*
+* @param isAppendOnly true if the table is append-only, false 
otherwise.
+*/
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only tables " +
+"as the table would grow infinitely")
+}
+  }
+
+  /** Returns the requested record type */
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  /** Emits the DataStream. */
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes)
+
+dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType))
+  .process(processFunction)
--- End diff --

I'm waiting another [PR](https://github.com/apache/flink/pull/5680) to be 
merged.


---


[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

2018-03-14 Thread liurenjie1024
Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5680
  
@bowenli86 @fhueske UT added.

This class is a utility class which can be the base class of many process 
function implementations in flink table. Its counterpart for the legacy 
`ProcessFunction `interface, `ProcessFunctionWithCleanupState`, has been 
inherited by many implementations. In fact, my other 
[PR](https://github.com/apache/flink/pull/5688) depends on this.


---


[GitHub] flink issue #5688: [FLINK-6968][Table API & SQL] Add Queryable table sink.

2018-03-13 Thread liurenjie1024
Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5688
  
@twalthr Please help to review this.


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-03-13 Thread liurenjie1024
GitHub user liurenjie1024 opened a pull request:

https://github.com/apache/flink/pull/5688

[FLINK-6968][Table API & SQL] Add Queryable table sink.

## What is the purpose of the change

Streaming tables with unique key are continuously updated. For example 
queries with a non-windowed aggregation generate such tables. Commonly, such 
updating tables are emitted via an upsert table sink to an external datastore 
(k-v store, database) to make it accessible to applications.

## Brief change log

  - *Add a QueryableStateTableSink.*
  - *States are queryable.*

## Verifying this change

This change added tests and can be verified as follows:
  - *Added test that validates that states will be stored.*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and use 
QueryableStateClient to test that.*


## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? docs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/liurenjie1024/flink QueryableTableSink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5688.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5688


commit 0663e550216dfef8bf205f90d5ac8a0e7e77a42b
Author: liurenjie1024 <liurenjie2008@...>
Date:   2018-03-12T07:32:44Z

Code complete

commit 6d62d53f0bae65249ab69bddf7932e62ae1e7897
Author: liurenjie1024 <liurenjie2008@...>
Date:   2018-03-13T09:44:30Z

Add test

commit c9ffa6ecdd638a497b60f3f063b2d352b1b98059
Author: liurenjie1024 <liurenjie2008@...>
Date:   2018-03-13T10:43:19Z

Fix test style




---


[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

2018-03-13 Thread liurenjie1024
Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5680
  
Can anyone help to merge this?


---


[GitHub] flink issue #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

2018-03-12 Thread liurenjie1024
Github user liurenjie1024 commented on the issue:

https://github.com/apache/flink/pull/5680
  
@bowenli86 This is a trivial change and most the code is copied from the 
non keyed counterpart, so I don't think we need a test.


---


[GitHub] flink pull request #5680: [FLINK-8919] Add KeyedProcessFunctionWithCleanupSt...

2018-03-12 Thread liurenjie1024
GitHub user liurenjie1024 opened a pull request:

https://github.com/apache/flink/pull/5680

[FLINK-8919] Add KeyedProcessFunctionWithCleanupState.

## What is the purpose of the change

*Add ProcessFunctionWithCleanupState's counterpart for 
KeyedProcessFunction.*

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/liurenjie1024/flink 
KeyedProcessFunctionWithCleanupState

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5680.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5680


commit 26915427ba2f3c8e131cbd788c7e4967e69ae2c0
Author: liurenjie1024 <liurenjie2008@...>
Date:   2018-03-12T07:43:26Z

KeyedProcessFunctionWithCleanupState




---