[GitHub] incubator-gearpump pull request #192: [GEARPUMP-320] Handle stashed messages...

2017-06-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-gearpump/pull/192


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #192: [GEARPUMP-320] Handle stashed messages after ...

2017-06-22 Thread codecov-io
Github user codecov-io commented on the issue:

https://github.com/apache/incubator-gearpump/pull/192
  
# 
[Codecov](https://codecov.io/gh/apache/incubator-gearpump/pull/192?src=pr=h1)
 Report
> Merging 
[#192](https://codecov.io/gh/apache/incubator-gearpump/pull/192?src=pr=desc) 
into 
[master](https://codecov.io/gh/apache/incubator-gearpump/commit/2877c81b6ffd48b7e781aa2a476728c7252e7224?src=pr=desc)
 will **increase** coverage by `0.16%`.
> The diff coverage is `66.66%`.



```diff
@@Coverage Diff @@
##   master #192  +/-   ##
==
+ Coverage   70.29%   70.45%   +0.16% 
==
  Files 189  189  
  Lines6106 6106  
  Branches  537  537  
==
+ Hits 4292 4302  +10 
+ Misses   1814 1804  -10
```




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #192: [GEARPUMP-320] Handle stashed messages after ...

2017-06-22 Thread huafengw
Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/192
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #192: [GEARPUMP-320] Handle stashed messages...

2017-06-22 Thread manuzhang
GitHub user manuzhang opened a pull request:

https://github.com/apache/incubator-gearpump/pull/192

[GEARPUMP-320] Handle stashed messages after onStart

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the commit message is formatted like:
   `[GEARPUMP-] Meaningful description of pull request` 
 - [ ] Make sure tests pass via `sbt clean test`.
 - [ ] Make sure old documentation affected by the pull request has been 
updated and new documentation added for new functionality. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/manuzhang/incubator-gearpump fix_start

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-gearpump/pull/192.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #192


commit d2aa227e8ee16476d97843fd6f6379767f87826f
Author: manuzhang 
Date:   2017-06-23T01:30:13Z

[GEARPUMP-320] Handle stashed messages after onStart




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump issue #191: [GEARPUMP-319] Support Sorted Set in Redis

2017-06-22 Thread manuzhang
Github user manuzhang commented on the issue:

https://github.com/apache/incubator-gearpump/pull/191
  
R: @manuzhang


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (GEARPUMP-320) Stashed messages are not processed

2017-06-22 Thread Manu Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/GEARPUMP-320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manu Zhang resolved GEARPUMP-320.
-
   Resolution: Fixed
Fix Version/s: 0.8.4

Issue resolved by pull request 192
[https://github.com/apache/incubator-gearpump/pull/192]

> Stashed messages are not processed 
> ---
>
> Key: GEARPUMP-320
> URL: https://issues.apache.org/jira/browse/GEARPUMP-320
> Project: Apache Gearpump
>  Issue Type: Bug
>  Components: streaming
>Affects Versions: 0.8.3
>Reporter: Manu Zhang
>Assignee: Manu Zhang
> Fix For: 0.8.4
>
>
> This is because stashed messages are handled before _TaskActor.onStart_ 
> creating _Task_.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (GEARPUMP-320) Stashed messages are not processed

2017-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/GEARPUMP-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060306#comment-16060306
 ] 

ASF GitHub Bot commented on GEARPUMP-320:
-

Github user asfgit closed the pull request at:

https://github.com/apache/incubator-gearpump/pull/192


> Stashed messages are not processed 
> ---
>
> Key: GEARPUMP-320
> URL: https://issues.apache.org/jira/browse/GEARPUMP-320
> Project: Apache Gearpump
>  Issue Type: Bug
>  Components: streaming
>Affects Versions: 0.8.3
>Reporter: Manu Zhang
>Assignee: Manu Zhang
> Fix For: 0.8.4
>
>
> This is because stashed messages are handled before _TaskActor.onStart_ 
> creating _Task_.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (GEARPUMP-320) Stashed messages are not processed

2017-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/GEARPUMP-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060301#comment-16060301
 ] 

ASF GitHub Bot commented on GEARPUMP-320:
-

Github user huafengw commented on the issue:

https://github.com/apache/incubator-gearpump/pull/192
  
+1


> Stashed messages are not processed 
> ---
>
> Key: GEARPUMP-320
> URL: https://issues.apache.org/jira/browse/GEARPUMP-320
> Project: Apache Gearpump
>  Issue Type: Bug
>  Components: streaming
>Affects Versions: 0.8.3
>Reporter: Manu Zhang
>Assignee: Manu Zhang
>
> This is because stashed messages are handled before _TaskActor.onStart_ 
> creating _Task_.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (GEARPUMP-319) Support Sorted Set in Redis

2017-06-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/GEARPUMP-319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060297#comment-16060297
 ] 

ASF GitHub Bot commented on GEARPUMP-319:
-

Github user manuzhang commented on the issue:

https://github.com/apache/incubator-gearpump/pull/191
  
R: @manuzhang


> Support Sorted Set in Redis
> ---
>
> Key: GEARPUMP-319
> URL: https://issues.apache.org/jira/browse/GEARPUMP-319
> Project: Apache Gearpump
>  Issue Type: New Feature
>  Components: core
>Reporter: darion yaphet
>Assignee: darion yaphet
>
> *Sorted Set* is similar with *Set* in Redis with unique values . Each element 
> storage in Sorted Set is associated with a score that is use to sort the 
> element in order .



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] incubator-gearpump pull request #191: [GEARPUMP-319] Support Sorted Set in R...

2017-06-22 Thread darionyaphet
GitHub user darionyaphet opened a pull request:

https://github.com/apache/incubator-gearpump/pull/191

[GEARPUMP-319] Support Sorted Set in Redis

Sorted Set is similar with Set in Redis with unique values . 

Each element storage in Sorted Set is associated with a score that is use 
to sort the element in order .

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/darionyaphet/incubator-gearpump GEARPUMP-319

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-gearpump/pull/191.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #191


commit 77d12e97843deae16c2598b5e23bcd4ea0968fed
Author: darionyaphet 
Date:   2017-06-22T08:05:39Z

[GEARPUMP-319] Support Sorted Set in Redis




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (GEARPUMP-319) Support Sorted Set in Redis

2017-06-22 Thread darion yaphet (JIRA)
darion yaphet created GEARPUMP-319:
--

 Summary: Support Sorted Set in Redis
 Key: GEARPUMP-319
 URL: https://issues.apache.org/jira/browse/GEARPUMP-319
 Project: Apache Gearpump
  Issue Type: New Feature
  Components: core
Reporter: darion yaphet
Assignee: darion yaphet


*Sorted Set* is similar with *Set* in Redis with unique values . Each element 
storage in Sorted Set is associated with a score that is use to sort the 
element in order .



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123429407
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.io._
+import java.time.Instant
+import java.util
+import java.util.Map
+
+import com.google.common.collect.Table
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.refactor.coder._
+import org.apache.gearpump.streaming.refactor.state.api.StateInternals
+import 
org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFactory
+import org.apache.gearpump.streaming.state.impl.{CheckpointManager, 
PersistentStateConfig}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, 
UpdateCheckpointClock}
+import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
+import org.apache.gearpump.util.LogUtil
+import org.apache.gearpump.{Message, TimeStamp}
+
+object StatefulTask {
+  val LOG = LogUtil.getLogger(getClass)
+}
+
+/**
+ * stateful task that support state access and all state will be backed in 
memory
+ * after checkpoint state will persist into storage layer and it will 
guarantee
+ * 'exactly-once' process semantic
+ *
+ */
+abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+
+  import taskContext._
+
+  val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](
+PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
+  val checkpointStore = checkpointStoreFactory.getCheckpointStore(
+s"app$appId-task${taskId.processorId}_${taskId.index}")
+  val checkpointInterval = 
conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get
+  val checkpointManager = new CheckpointManager(checkpointInterval, 
checkpointStore)
+
+  var inited = false
+
+  // core state data
+  var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] 
= null
+
+  /**
+   * subclass should override this method if they want to init state api
+   */
+  def open: Unit = {}
+
+  /**
+   * Subclass should override this method to specify how a new message 
should update state
+   */
+  def invoke(message: Message): Unit
+
+  def close: Unit = {}
+
+  def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals 
= {
+if (!inited) {
+  throw new RuntimeException(" please init state access object in 
`open` method! ")
+}
+if (encodedKeyStateMap == null) {
+  encodedKeyStateMap = new util.HashMap[String, Table[String, String, 
Array[Byte]]]()
+}
+
+val factory = new HeapStateInternalsFactory[KT](keyCoder, 
encodedKeyStateMap)
+factory.stateInternalsForKey(key)
+  }
+
+  final override def onStart(startTime: Instant): Unit = {
+// recover state from snapshot
+LOG.info("[onStart] - recover from snapshot")
+val timestamp = startTime.toEpochMilli
+checkpointManager
+  .recover(timestamp)
+  .foreach(recoverState(timestamp, _))
+reportCheckpointClock(timestamp)
+inited = true
+open
+  }
+
+  final override def onNext(message: Message): Unit = {
+checkpointManager.update(message.timestamp.toEpochMilli)
+invoke(message)
+  }
+
+  final override def onWatermarkProgress(watermark: Instant): Unit = {
+if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) {
+  checkpointManager.getCheckpointTime.foreach { checkpointTime =>
+val serialized = snapshot
+checkpointManager.checkpoint(checkpointTime, serialized)
+reportCheckpointClock(checkpointTime)
+  }
+}
+  }
+
  

[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123429421
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.io._
+import java.time.Instant
+import java.util
+import java.util.Map
+
+import com.google.common.collect.Table
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.refactor.coder._
+import org.apache.gearpump.streaming.refactor.state.api.StateInternals
+import 
org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFactory
+import org.apache.gearpump.streaming.state.impl.{CheckpointManager, 
PersistentStateConfig}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, 
UpdateCheckpointClock}
+import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
+import org.apache.gearpump.util.LogUtil
+import org.apache.gearpump.{Message, TimeStamp}
+
+object StatefulTask {
+  val LOG = LogUtil.getLogger(getClass)
+}
+
+/**
+ * stateful task that support state access and all state will be backed in 
memory
+ * after checkpoint state will persist into storage layer and it will 
guarantee
+ * 'exactly-once' process semantic
+ *
+ */
+abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+
+  import taskContext._
+
+  val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](
+PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
+  val checkpointStore = checkpointStoreFactory.getCheckpointStore(
+s"app$appId-task${taskId.processorId}_${taskId.index}")
+  val checkpointInterval = 
conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get
+  val checkpointManager = new CheckpointManager(checkpointInterval, 
checkpointStore)
+
+  var inited = false
+
+  // core state data
+  var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] 
= null
+
+  /**
+   * subclass should override this method if they want to init state api
+   */
+  def open: Unit = {}
+
+  /**
+   * Subclass should override this method to specify how a new message 
should update state
+   */
+  def invoke(message: Message): Unit
+
+  def close: Unit = {}
+
+  def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals 
= {
+if (!inited) {
+  throw new RuntimeException(" please init state access object in 
`open` method! ")
+}
+if (encodedKeyStateMap == null) {
+  encodedKeyStateMap = new util.HashMap[String, Table[String, String, 
Array[Byte]]]()
+}
+
+val factory = new HeapStateInternalsFactory[KT](keyCoder, 
encodedKeyStateMap)
+factory.stateInternalsForKey(key)
+  }
+
+  final override def onStart(startTime: Instant): Unit = {
+// recover state from snapshot
+LOG.info("[onStart] - recover from snapshot")
+val timestamp = startTime.toEpochMilli
+checkpointManager
+  .recover(timestamp)
+  .foreach(recoverState(timestamp, _))
+reportCheckpointClock(timestamp)
+inited = true
+open
+  }
+
+  final override def onNext(message: Message): Unit = {
+checkpointManager.update(message.timestamp.toEpochMilli)
+invoke(message)
+  }
+
+  final override def onWatermarkProgress(watermark: Instant): Unit = {
+if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) {
+  checkpointManager.getCheckpointTime.foreach { checkpointTime =>
+val serialized = snapshot
+checkpointManager.checkpoint(checkpointTime, serialized)
+reportCheckpointClock(checkpointTime)
+  }
+}
+  }
+
  

[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123428717
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util.Objects
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.api.{BagState, 
MapState, SetState, ValueState}
+
+/**
+ * a utility object for StateSpec
+ */
+object StateSpecs {
+
+  private class ValueStateSpec[T](coder: Coder[T]) extends 
StateSpec[ValueState[T]] {
+
+var aCoder: Coder[T] = coder
+
+override def bind(id: String, binder: StateBinder): ValueState[T] = {
+  binder.bindValue(id, this, aCoder)
+}
+
+override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = {
+  if (this.aCoder == null) {
+if (coders(0) != null) {
+  this.aCoder = coders(0).asInstanceOf[Coder[T]]
+}
+  }
+}
+
+override def finishSpecifying: Unit = {
+  if (aCoder == null) throw new IllegalStateException(
+"Unable to infer a coder for ValueState and no Coder"
++ " was specified. Please set a coder by either invoking"
++ " StateSpecs.value(Coder valueCoder) or by registering the 
coder in the"
++ " Pipeline's CoderRegistry.")
+}
+
+override def equals(obj: Any): Boolean = {
+  if (obj == this) true
--- End diff --

It's not right in Scala, here without `return`, `true` just becomes a 
useless statement, which means eventually the last line 
`Objects.equals(this.aCoder, that.aCoder)` will be executed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123428767
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util.Objects
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.api.{BagState, 
MapState, SetState, ValueState}
+
+/**
+ * a utility object for StateSpec
+ */
+object StateSpecs {
+
+  private class ValueStateSpec[T](coder: Coder[T]) extends 
StateSpec[ValueState[T]] {
+
+var aCoder: Coder[T] = coder
+
+override def bind(id: String, binder: StateBinder): ValueState[T] = {
+  binder.bindValue(id, this, aCoder)
+}
+
+override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = {
+  if (this.aCoder == null) {
+if (coders(0) != null) {
+  this.aCoder = coders(0).asInstanceOf[Coder[T]]
+}
+  }
+}
+
+override def finishSpecifying: Unit = {
+  if (aCoder == null) throw new IllegalStateException(
+"Unable to infer a coder for ValueState and no Coder"
++ " was specified. Please set a coder by either invoking"
++ " StateSpecs.value(Coder valueCoder) or by registering the 
coder in the"
++ " Pipeline's CoderRegistry.")
+}
+
+override def equals(obj: Any): Boolean = {
+  if (obj == this) true
+
+  if (!(obj.isInstanceOf[ValueStateSpec[T]])) false
+
+  val that: ValueStateSpec[_] = obj.asInstanceOf[ValueStateSpec[_]]
+  Objects.equals(this.aCoder, that.aCoder)
+}
+
+override def hashCode(): Int = {
+  Objects.hashCode(this.aCoder)
+}
+  }
+
+  private class BagStateSpec[T](coder: Coder[T]) extends 
StateSpec[BagState[T]] {
+
+private implicit var elemCoder = coder
+
+override def bind(id: String, binder: StateBinder): BagState[T] =
+  binder.bindBag(id, this, elemCoder)
+
+override def offerCoders(coders: Array[Coder[BagState[T]]]): Unit = {
+  if (this.elemCoder == null) {
+if (coders(0) != null) {
+  this.elemCoder = coders(0).asInstanceOf[Coder[T]]
+}
+  }
+}
+
+override def finishSpecifying: Unit = {
+  if (elemCoder == null) {
+throw new IllegalStateException("Unable to infer a coder for 
BagState and no Coder"
+  + " was specified. Please set a coder by either invoking"
+  + " StateSpecs.bag(Coder elemCoder) or by registering the 
coder in the"
+  + " Pipeline's CoderRegistry.");
+  }
+}
+
+override def equals(obj: Any): Boolean = {
+  if (obj == this) true
+
+  if (!obj.isInstanceOf[BagStateSpec[_]]) false
+
+  val that = obj.asInstanceOf[BagStateSpec[_]]
+  Objects.equals(this.elemCoder, that.elemCoder)
+}
+
+override def hashCode(): Int = Objects.hash(getClass, elemCoder)
+  }
+
+  private class MapStateSpec[K, V](keyCoder: Coder[K], valueCoder: 
Coder[V])
+extends StateSpec[MapState[K, V]] {
+
+private implicit var kCoder = keyCoder
+private implicit var vCoder = valueCoder
+
+override def bind(id: String, binder: StateBinder): MapState[K, V] =
+  binder.bindMap(id, this, keyCoder, valueCoder)
+
+override def offerCoders(coders: Array[Coder[MapState[K, V]]]): Unit = 
{
+  if (this.kCoder == null) {
+if (coders(0) != null) {
+  this.kCoder = coders(0).asInstanceOf[Coder[K]]
+}
+  }
+
+  if (this.vCoder == null) {
+if (coders(1) != null) {
+  this.vCoder = 

[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123428739
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util.Objects
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.api.{BagState, 
MapState, SetState, ValueState}
+
+/**
+ * a utility object for StateSpec
+ */
+object StateSpecs {
+
+  private class ValueStateSpec[T](coder: Coder[T]) extends 
StateSpec[ValueState[T]] {
+
+var aCoder: Coder[T] = coder
+
+override def bind(id: String, binder: StateBinder): ValueState[T] = {
+  binder.bindValue(id, this, aCoder)
+}
+
+override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = {
+  if (this.aCoder == null) {
+if (coders(0) != null) {
+  this.aCoder = coders(0).asInstanceOf[Coder[T]]
+}
+  }
+}
+
+override def finishSpecifying: Unit = {
+  if (aCoder == null) throw new IllegalStateException(
+"Unable to infer a coder for ValueState and no Coder"
++ " was specified. Please set a coder by either invoking"
++ " StateSpecs.value(Coder valueCoder) or by registering the 
coder in the"
++ " Pipeline's CoderRegistry.")
+}
+
+override def equals(obj: Any): Boolean = {
+  if (obj == this) true
+
+  if (!(obj.isInstanceOf[ValueStateSpec[T]])) false
+
+  val that: ValueStateSpec[_] = obj.asInstanceOf[ValueStateSpec[_]]
+  Objects.equals(this.aCoder, that.aCoder)
+}
+
+override def hashCode(): Int = {
+  Objects.hashCode(this.aCoder)
+}
+  }
+
+  private class BagStateSpec[T](coder: Coder[T]) extends 
StateSpec[BagState[T]] {
+
+private implicit var elemCoder = coder
+
+override def bind(id: String, binder: StateBinder): BagState[T] =
+  binder.bindBag(id, this, elemCoder)
+
+override def offerCoders(coders: Array[Coder[BagState[T]]]): Unit = {
+  if (this.elemCoder == null) {
+if (coders(0) != null) {
+  this.elemCoder = coders(0).asInstanceOf[Coder[T]]
+}
+  }
+}
+
+override def finishSpecifying: Unit = {
+  if (elemCoder == null) {
+throw new IllegalStateException("Unable to infer a coder for 
BagState and no Coder"
+  + " was specified. Please set a coder by either invoking"
+  + " StateSpecs.bag(Coder elemCoder) or by registering the 
coder in the"
+  + " Pipeline's CoderRegistry.");
+  }
+}
+
+override def equals(obj: Any): Boolean = {
+  if (obj == this) true
--- End diff --

The same


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123428752
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util.Objects
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.api.{BagState, 
MapState, SetState, ValueState}
+
+/**
+ * a utility object for StateSpec
+ */
+object StateSpecs {
+
+  private class ValueStateSpec[T](coder: Coder[T]) extends 
StateSpec[ValueState[T]] {
+
+var aCoder: Coder[T] = coder
+
+override def bind(id: String, binder: StateBinder): ValueState[T] = {
+  binder.bindValue(id, this, aCoder)
+}
+
+override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = {
+  if (this.aCoder == null) {
+if (coders(0) != null) {
+  this.aCoder = coders(0).asInstanceOf[Coder[T]]
+}
+  }
+}
+
+override def finishSpecifying: Unit = {
+  if (aCoder == null) throw new IllegalStateException(
+"Unable to infer a coder for ValueState and no Coder"
++ " was specified. Please set a coder by either invoking"
++ " StateSpecs.value(Coder valueCoder) or by registering the 
coder in the"
++ " Pipeline's CoderRegistry.")
+}
+
+override def equals(obj: Any): Boolean = {
+  if (obj == this) true
+
+  if (!(obj.isInstanceOf[ValueStateSpec[T]])) false
+
+  val that: ValueStateSpec[_] = obj.asInstanceOf[ValueStateSpec[_]]
+  Objects.equals(this.aCoder, that.aCoder)
+}
+
+override def hashCode(): Int = {
+  Objects.hashCode(this.aCoder)
+}
+  }
+
+  private class BagStateSpec[T](coder: Coder[T]) extends 
StateSpec[BagState[T]] {
+
+private implicit var elemCoder = coder
+
+override def bind(id: String, binder: StateBinder): BagState[T] =
+  binder.bindBag(id, this, elemCoder)
+
+override def offerCoders(coders: Array[Coder[BagState[T]]]): Unit = {
+  if (this.elemCoder == null) {
+if (coders(0) != null) {
+  this.elemCoder = coders(0).asInstanceOf[Coder[T]]
+}
+  }
+}
+
+override def finishSpecifying: Unit = {
+  if (elemCoder == null) {
+throw new IllegalStateException("Unable to infer a coder for 
BagState and no Coder"
+  + " was specified. Please set a coder by either invoking"
+  + " StateSpecs.bag(Coder elemCoder) or by registering the 
coder in the"
+  + " Pipeline's CoderRegistry.");
+  }
+}
+
+override def equals(obj: Any): Boolean = {
+  if (obj == this) true
+
+  if (!obj.isInstanceOf[BagStateSpec[_]]) false
+
+  val that = obj.asInstanceOf[BagStateSpec[_]]
+  Objects.equals(this.elemCoder, that.elemCoder)
+}
+
+override def hashCode(): Int = Objects.hash(getClass, elemCoder)
+  }
+
+  private class MapStateSpec[K, V](keyCoder: Coder[K], valueCoder: 
Coder[V])
+extends StateSpec[MapState[K, V]] {
+
+private implicit var kCoder = keyCoder
+private implicit var vCoder = valueCoder
+
+override def bind(id: String, binder: StateBinder): MapState[K, V] =
+  binder.bindMap(id, this, keyCoder, valueCoder)
+
+override def offerCoders(coders: Array[Coder[MapState[K, V]]]): Unit = 
{
+  if (this.kCoder == null) {
+if (coders(0) != null) {
+  this.kCoder = coders(0).asInstanceOf[Coder[K]]
+}
+  }
+
+  if (this.vCoder == null) {
+if (coders(1) != null) {
+  this.vCoder = 

[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123429442
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.io._
+import java.time.Instant
+import java.util
+import java.util.Map
+
+import com.google.common.collect.Table
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.refactor.coder._
+import org.apache.gearpump.streaming.refactor.state.api.StateInternals
+import 
org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFactory
+import org.apache.gearpump.streaming.state.impl.{CheckpointManager, 
PersistentStateConfig}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, 
UpdateCheckpointClock}
+import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
+import org.apache.gearpump.util.LogUtil
+import org.apache.gearpump.{Message, TimeStamp}
+
+object StatefulTask {
+  val LOG = LogUtil.getLogger(getClass)
+}
+
+/**
+ * stateful task that support state access and all state will be backed in 
memory
+ * after checkpoint state will persist into storage layer and it will 
guarantee
+ * 'exactly-once' process semantic
+ *
+ */
+abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+
+  import taskContext._
+
+  val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](
+PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
+  val checkpointStore = checkpointStoreFactory.getCheckpointStore(
+s"app$appId-task${taskId.processorId}_${taskId.index}")
+  val checkpointInterval = 
conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get
+  val checkpointManager = new CheckpointManager(checkpointInterval, 
checkpointStore)
+
+  var inited = false
+
+  // core state data
+  var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] 
= null
+
+  /**
+   * subclass should override this method if they want to init state api
+   */
+  def open: Unit = {}
+
+  /**
+   * Subclass should override this method to specify how a new message 
should update state
+   */
+  def invoke(message: Message): Unit
+
+  def close: Unit = {}
+
+  def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals 
= {
+if (!inited) {
+  throw new RuntimeException(" please init state access object in 
`open` method! ")
+}
+if (encodedKeyStateMap == null) {
+  encodedKeyStateMap = new util.HashMap[String, Table[String, String, 
Array[Byte]]]()
+}
+
+val factory = new HeapStateInternalsFactory[KT](keyCoder, 
encodedKeyStateMap)
+factory.stateInternalsForKey(key)
+  }
+
+  final override def onStart(startTime: Instant): Unit = {
+// recover state from snapshot
+LOG.info("[onStart] - recover from snapshot")
+val timestamp = startTime.toEpochMilli
+checkpointManager
+  .recover(timestamp)
+  .foreach(recoverState(timestamp, _))
+reportCheckpointClock(timestamp)
+inited = true
+open
+  }
+
+  final override def onNext(message: Message): Unit = {
+checkpointManager.update(message.timestamp.toEpochMilli)
+invoke(message)
+  }
+
+  final override def onWatermarkProgress(watermark: Instant): Unit = {
+if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) {
+  checkpointManager.getCheckpointTime.foreach { checkpointTime =>
+val serialized = snapshot
+checkpointManager.checkpoint(checkpointTime, serialized)
+reportCheckpointClock(checkpointTime)
+  }
+}
+  }
+
  

[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123430323
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternals.scala
 ---
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state.heap
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, 
OutputStream}
+import java.lang.Iterable
+import java.util
+import java.util.Map.Entry
+import java.util._
+import java.util.Objects
+
+import com.google.common.collect.Table
+import org.apache.gearpump.streaming.refactor.coder.{Coder, ListCoder, 
MapCoder, SetCoder}
+import org.apache.gearpump.streaming.refactor.state.{StateBinder, 
StateNamespace, StateSpec, StateTag}
+import org.apache.gearpump.streaming.refactor.state.api._
+import org.apache.gearpump.util.LogUtil
+
+/**
+ *  a heap memory backend StateInternals implementation
+ *  it will cache state in heap memory with a Guava's Table structure
+ *  there are three concept :
+ *  (1) namespace mapping table's row id
+ *  (2) tag mapping table's field
+ *  (3) state mapping table's cell value
+ */
+class HeapStateInternals[K](key: K, stateTable: Table[String, String, 
Array[Byte]])
+  extends StateInternals {
+
+  val LOG = LogUtil.getLogger(getClass)
+
+  private val k: K = key
--- End diff --

redundant


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123429491
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala
 ---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.io._
+import java.time.Instant
+import java.util
+import java.util.Map
+
+import com.google.common.collect.Table
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.refactor.coder._
+import org.apache.gearpump.streaming.refactor.state.api.StateInternals
+import 
org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFactory
+import org.apache.gearpump.streaming.state.impl.{CheckpointManager, 
PersistentStateConfig}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, 
UpdateCheckpointClock}
+import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
+import org.apache.gearpump.util.LogUtil
+import org.apache.gearpump.{Message, TimeStamp}
+
+object StatefulTask {
+  val LOG = LogUtil.getLogger(getClass)
+}
+
+/**
+ * stateful task that support state access and all state will be backed in 
memory
+ * after checkpoint state will persist into storage layer and it will 
guarantee
+ * 'exactly-once' process semantic
+ *
+ */
+abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig)
+  extends Task(taskContext, conf) {
+
+  import taskContext._
+
+  val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](
+PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
+  val checkpointStore = checkpointStoreFactory.getCheckpointStore(
+s"app$appId-task${taskId.processorId}_${taskId.index}")
+  val checkpointInterval = 
conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get
+  val checkpointManager = new CheckpointManager(checkpointInterval, 
checkpointStore)
+
+  var inited = false
+
+  // core state data
+  var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] 
= null
+
+  /**
+   * subclass should override this method if they want to init state api
+   */
+  def open: Unit = {}
+
+  /**
+   * Subclass should override this method to specify how a new message 
should update state
+   */
+  def invoke(message: Message): Unit
+
+  def close: Unit = {}
+
+  def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals 
= {
+if (!inited) {
+  throw new RuntimeException(" please init state access object in 
`open` method! ")
+}
+if (encodedKeyStateMap == null) {
+  encodedKeyStateMap = new util.HashMap[String, Table[String, String, 
Array[Byte]]]()
+}
+
+val factory = new HeapStateInternalsFactory[KT](keyCoder, 
encodedKeyStateMap)
+factory.stateInternalsForKey(key)
+  }
+
+  final override def onStart(startTime: Instant): Unit = {
+// recover state from snapshot
+LOG.info("[onStart] - recover from snapshot")
+val timestamp = startTime.toEpochMilli
+checkpointManager
+  .recover(timestamp)
+  .foreach(recoverState(timestamp, _))
+reportCheckpointClock(timestamp)
+inited = true
+open
+  }
+
+  final override def onNext(message: Message): Unit = {
+checkpointManager.update(message.timestamp.toEpochMilli)
+invoke(message)
+  }
+
+  final override def onWatermarkProgress(watermark: Instant): Unit = {
+if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) {
+  checkpointManager.getCheckpointTime.foreach { checkpointTime =>
+val serialized = snapshot
+checkpointManager.checkpoint(checkpointTime, serialized)
+reportCheckpointClock(checkpointTime)
+  }
+}
+  }
+
  

[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...

2017-06-22 Thread huafengw
Github user huafengw commented on a diff in the pull request:

https://github.com/apache/incubator-gearpump/pull/190#discussion_r123430243
  
--- Diff: 
streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/InMemoryGlobalStateInternals.scala
 ---
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.util
+import java.util.Map.Entry
+import java.util.{ArrayList, HashSet, List, Set}
+import java.lang.Iterable
+
+import com.google.common.collect.{HashBasedTable, Table}
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import 
org.apache.gearpump.streaming.refactor.state.InMemoryGlobalStateInternals.InMemoryStateBinder
+import org.apache.gearpump.streaming.refactor.state.api._
+
+/**
+ * a no namespace (global) in memory state internal.
+ * it is a default implementation inspired by Apache Beam.
+ * but do not recommend use it!
+ */
+class InMemoryGlobalStateInternals[K] protected(key: K) extends 
StateInternals {
+
+  private val k: K = key
--- End diff --

Look like `k` is redundant since you already have `key`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---