[GitHub] incubator-gearpump pull request #192: [GEARPUMP-320] Handle stashed messages...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-gearpump/pull/192 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #192: [GEARPUMP-320] Handle stashed messages after ...
Github user codecov-io commented on the issue: https://github.com/apache/incubator-gearpump/pull/192 # [Codecov](https://codecov.io/gh/apache/incubator-gearpump/pull/192?src=pr=h1) Report > Merging [#192](https://codecov.io/gh/apache/incubator-gearpump/pull/192?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-gearpump/commit/2877c81b6ffd48b7e781aa2a476728c7252e7224?src=pr=desc) will **increase** coverage by `0.16%`. > The diff coverage is `66.66%`. ```diff @@Coverage Diff @@ ## master #192 +/- ## == + Coverage 70.29% 70.45% +0.16% == Files 189 189 Lines6106 6106 Branches 537 537 == + Hits 4292 4302 +10 + Misses 1814 1804 -10 ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #192: [GEARPUMP-320] Handle stashed messages after ...
Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/192 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #192: [GEARPUMP-320] Handle stashed messages...
GitHub user manuzhang opened a pull request: https://github.com/apache/incubator-gearpump/pull/192 [GEARPUMP-320] Handle stashed messages after onStart Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the commit message is formatted like: `[GEARPUMP-] Meaningful description of pull request` - [ ] Make sure tests pass via `sbt clean test`. - [ ] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. You can merge this pull request into a Git repository by running: $ git pull https://github.com/manuzhang/incubator-gearpump fix_start Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-gearpump/pull/192.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #192 commit d2aa227e8ee16476d97843fd6f6379767f87826f Author: manuzhangDate: 2017-06-23T01:30:13Z [GEARPUMP-320] Handle stashed messages after onStart --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump issue #191: [GEARPUMP-319] Support Sorted Set in Redis
Github user manuzhang commented on the issue: https://github.com/apache/incubator-gearpump/pull/191 R: @manuzhang --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (GEARPUMP-320) Stashed messages are not processed
[ https://issues.apache.org/jira/browse/GEARPUMP-320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manu Zhang resolved GEARPUMP-320. - Resolution: Fixed Fix Version/s: 0.8.4 Issue resolved by pull request 192 [https://github.com/apache/incubator-gearpump/pull/192] > Stashed messages are not processed > --- > > Key: GEARPUMP-320 > URL: https://issues.apache.org/jira/browse/GEARPUMP-320 > Project: Apache Gearpump > Issue Type: Bug > Components: streaming >Affects Versions: 0.8.3 >Reporter: Manu Zhang >Assignee: Manu Zhang > Fix For: 0.8.4 > > > This is because stashed messages are handled before _TaskActor.onStart_ > creating _Task_. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (GEARPUMP-320) Stashed messages are not processed
[ https://issues.apache.org/jira/browse/GEARPUMP-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060306#comment-16060306 ] ASF GitHub Bot commented on GEARPUMP-320: - Github user asfgit closed the pull request at: https://github.com/apache/incubator-gearpump/pull/192 > Stashed messages are not processed > --- > > Key: GEARPUMP-320 > URL: https://issues.apache.org/jira/browse/GEARPUMP-320 > Project: Apache Gearpump > Issue Type: Bug > Components: streaming >Affects Versions: 0.8.3 >Reporter: Manu Zhang >Assignee: Manu Zhang > Fix For: 0.8.4 > > > This is because stashed messages are handled before _TaskActor.onStart_ > creating _Task_. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (GEARPUMP-320) Stashed messages are not processed
[ https://issues.apache.org/jira/browse/GEARPUMP-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060301#comment-16060301 ] ASF GitHub Bot commented on GEARPUMP-320: - Github user huafengw commented on the issue: https://github.com/apache/incubator-gearpump/pull/192 +1 > Stashed messages are not processed > --- > > Key: GEARPUMP-320 > URL: https://issues.apache.org/jira/browse/GEARPUMP-320 > Project: Apache Gearpump > Issue Type: Bug > Components: streaming >Affects Versions: 0.8.3 >Reporter: Manu Zhang >Assignee: Manu Zhang > > This is because stashed messages are handled before _TaskActor.onStart_ > creating _Task_. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (GEARPUMP-319) Support Sorted Set in Redis
[ https://issues.apache.org/jira/browse/GEARPUMP-319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16060297#comment-16060297 ] ASF GitHub Bot commented on GEARPUMP-319: - Github user manuzhang commented on the issue: https://github.com/apache/incubator-gearpump/pull/191 R: @manuzhang > Support Sorted Set in Redis > --- > > Key: GEARPUMP-319 > URL: https://issues.apache.org/jira/browse/GEARPUMP-319 > Project: Apache Gearpump > Issue Type: New Feature > Components: core >Reporter: darion yaphet >Assignee: darion yaphet > > *Sorted Set* is similar with *Set* in Redis with unique values . Each element > storage in Sorted Set is associated with a score that is use to sort the > element in order . -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] incubator-gearpump pull request #191: [GEARPUMP-319] Support Sorted Set in R...
GitHub user darionyaphet opened a pull request: https://github.com/apache/incubator-gearpump/pull/191 [GEARPUMP-319] Support Sorted Set in Redis Sorted Set is similar with Set in Redis with unique values . Each element storage in Sorted Set is associated with a score that is use to sort the element in order . You can merge this pull request into a Git repository by running: $ git pull https://github.com/darionyaphet/incubator-gearpump GEARPUMP-319 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-gearpump/pull/191.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #191 commit 77d12e97843deae16c2598b5e23bcd4ea0968fed Author: darionyaphetDate: 2017-06-22T08:05:39Z [GEARPUMP-319] Support Sorted Set in Redis --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (GEARPUMP-319) Support Sorted Set in Redis
darion yaphet created GEARPUMP-319: -- Summary: Support Sorted Set in Redis Key: GEARPUMP-319 URL: https://issues.apache.org/jira/browse/GEARPUMP-319 Project: Apache Gearpump Issue Type: New Feature Components: core Reporter: darion yaphet Assignee: darion yaphet *Sorted Set* is similar with *Set* in Redis with unique values . Each element storage in Sorted Set is associated with a score that is use to sort the element in order . -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123429407 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.io._ +import java.time.Instant +import java.util +import java.util.Map + +import com.google.common.collect.Table +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.refactor.coder._ +import org.apache.gearpump.streaming.refactor.state.api.StateInternals +import org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFactory +import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} +import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock} +import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory +import org.apache.gearpump.util.LogUtil +import org.apache.gearpump.{Message, TimeStamp} + +object StatefulTask { + val LOG = LogUtil.getLogger(getClass) +} + +/** + * stateful task that support state access and all state will be backed in memory + * after checkpoint state will persist into storage layer and it will guarantee + * 'exactly-once' process semantic + * + */ +abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + + import taskContext._ + + val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory]( +PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get + val checkpointStore = checkpointStoreFactory.getCheckpointStore( +s"app$appId-task${taskId.processorId}_${taskId.index}") + val checkpointInterval = conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get + val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) + + var inited = false + + // core state data + var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] = null + + /** + * subclass should override this method if they want to init state api + */ + def open: Unit = {} + + /** + * Subclass should override this method to specify how a new message should update state + */ + def invoke(message: Message): Unit + + def close: Unit = {} + + def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals = { +if (!inited) { + throw new RuntimeException(" please init state access object in `open` method! ") +} +if (encodedKeyStateMap == null) { + encodedKeyStateMap = new util.HashMap[String, Table[String, String, Array[Byte]]]() +} + +val factory = new HeapStateInternalsFactory[KT](keyCoder, encodedKeyStateMap) +factory.stateInternalsForKey(key) + } + + final override def onStart(startTime: Instant): Unit = { +// recover state from snapshot +LOG.info("[onStart] - recover from snapshot") +val timestamp = startTime.toEpochMilli +checkpointManager + .recover(timestamp) + .foreach(recoverState(timestamp, _)) +reportCheckpointClock(timestamp) +inited = true +open + } + + final override def onNext(message: Message): Unit = { +checkpointManager.update(message.timestamp.toEpochMilli) +invoke(message) + } + + final override def onWatermarkProgress(watermark: Instant): Unit = { +if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) { + checkpointManager.getCheckpointTime.foreach { checkpointTime => +val serialized = snapshot +checkpointManager.checkpoint(checkpointTime, serialized) +reportCheckpointClock(checkpointTime) + } +} + } +
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123429421 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.io._ +import java.time.Instant +import java.util +import java.util.Map + +import com.google.common.collect.Table +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.refactor.coder._ +import org.apache.gearpump.streaming.refactor.state.api.StateInternals +import org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFactory +import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} +import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock} +import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory +import org.apache.gearpump.util.LogUtil +import org.apache.gearpump.{Message, TimeStamp} + +object StatefulTask { + val LOG = LogUtil.getLogger(getClass) +} + +/** + * stateful task that support state access and all state will be backed in memory + * after checkpoint state will persist into storage layer and it will guarantee + * 'exactly-once' process semantic + * + */ +abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + + import taskContext._ + + val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory]( +PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get + val checkpointStore = checkpointStoreFactory.getCheckpointStore( +s"app$appId-task${taskId.processorId}_${taskId.index}") + val checkpointInterval = conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get + val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) + + var inited = false + + // core state data + var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] = null + + /** + * subclass should override this method if they want to init state api + */ + def open: Unit = {} + + /** + * Subclass should override this method to specify how a new message should update state + */ + def invoke(message: Message): Unit + + def close: Unit = {} + + def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals = { +if (!inited) { + throw new RuntimeException(" please init state access object in `open` method! ") +} +if (encodedKeyStateMap == null) { + encodedKeyStateMap = new util.HashMap[String, Table[String, String, Array[Byte]]]() +} + +val factory = new HeapStateInternalsFactory[KT](keyCoder, encodedKeyStateMap) +factory.stateInternalsForKey(key) + } + + final override def onStart(startTime: Instant): Unit = { +// recover state from snapshot +LOG.info("[onStart] - recover from snapshot") +val timestamp = startTime.toEpochMilli +checkpointManager + .recover(timestamp) + .foreach(recoverState(timestamp, _)) +reportCheckpointClock(timestamp) +inited = true +open + } + + final override def onNext(message: Message): Unit = { +checkpointManager.update(message.timestamp.toEpochMilli) +invoke(message) + } + + final override def onWatermarkProgress(watermark: Instant): Unit = { +if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) { + checkpointManager.getCheckpointTime.foreach { checkpointTime => +val serialized = snapshot +checkpointManager.checkpoint(checkpointTime, serialized) +reportCheckpointClock(checkpointTime) + } +} + } +
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123428717 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.util.Objects + +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.api.{BagState, MapState, SetState, ValueState} + +/** + * a utility object for StateSpec + */ +object StateSpecs { + + private class ValueStateSpec[T](coder: Coder[T]) extends StateSpec[ValueState[T]] { + +var aCoder: Coder[T] = coder + +override def bind(id: String, binder: StateBinder): ValueState[T] = { + binder.bindValue(id, this, aCoder) +} + +override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = { + if (this.aCoder == null) { +if (coders(0) != null) { + this.aCoder = coders(0).asInstanceOf[Coder[T]] +} + } +} + +override def finishSpecifying: Unit = { + if (aCoder == null) throw new IllegalStateException( +"Unable to infer a coder for ValueState and no Coder" ++ " was specified. Please set a coder by either invoking" ++ " StateSpecs.value(Coder valueCoder) or by registering the coder in the" ++ " Pipeline's CoderRegistry.") +} + +override def equals(obj: Any): Boolean = { + if (obj == this) true --- End diff -- It's not right in Scala, here without `return`, `true` just becomes a useless statement, which means eventually the last line `Objects.equals(this.aCoder, that.aCoder)` will be executed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123428767 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.util.Objects + +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.api.{BagState, MapState, SetState, ValueState} + +/** + * a utility object for StateSpec + */ +object StateSpecs { + + private class ValueStateSpec[T](coder: Coder[T]) extends StateSpec[ValueState[T]] { + +var aCoder: Coder[T] = coder + +override def bind(id: String, binder: StateBinder): ValueState[T] = { + binder.bindValue(id, this, aCoder) +} + +override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = { + if (this.aCoder == null) { +if (coders(0) != null) { + this.aCoder = coders(0).asInstanceOf[Coder[T]] +} + } +} + +override def finishSpecifying: Unit = { + if (aCoder == null) throw new IllegalStateException( +"Unable to infer a coder for ValueState and no Coder" ++ " was specified. Please set a coder by either invoking" ++ " StateSpecs.value(Coder valueCoder) or by registering the coder in the" ++ " Pipeline's CoderRegistry.") +} + +override def equals(obj: Any): Boolean = { + if (obj == this) true + + if (!(obj.isInstanceOf[ValueStateSpec[T]])) false + + val that: ValueStateSpec[_] = obj.asInstanceOf[ValueStateSpec[_]] + Objects.equals(this.aCoder, that.aCoder) +} + +override def hashCode(): Int = { + Objects.hashCode(this.aCoder) +} + } + + private class BagStateSpec[T](coder: Coder[T]) extends StateSpec[BagState[T]] { + +private implicit var elemCoder = coder + +override def bind(id: String, binder: StateBinder): BagState[T] = + binder.bindBag(id, this, elemCoder) + +override def offerCoders(coders: Array[Coder[BagState[T]]]): Unit = { + if (this.elemCoder == null) { +if (coders(0) != null) { + this.elemCoder = coders(0).asInstanceOf[Coder[T]] +} + } +} + +override def finishSpecifying: Unit = { + if (elemCoder == null) { +throw new IllegalStateException("Unable to infer a coder for BagState and no Coder" + + " was specified. Please set a coder by either invoking" + + " StateSpecs.bag(Coder elemCoder) or by registering the coder in the" + + " Pipeline's CoderRegistry."); + } +} + +override def equals(obj: Any): Boolean = { + if (obj == this) true + + if (!obj.isInstanceOf[BagStateSpec[_]]) false + + val that = obj.asInstanceOf[BagStateSpec[_]] + Objects.equals(this.elemCoder, that.elemCoder) +} + +override def hashCode(): Int = Objects.hash(getClass, elemCoder) + } + + private class MapStateSpec[K, V](keyCoder: Coder[K], valueCoder: Coder[V]) +extends StateSpec[MapState[K, V]] { + +private implicit var kCoder = keyCoder +private implicit var vCoder = valueCoder + +override def bind(id: String, binder: StateBinder): MapState[K, V] = + binder.bindMap(id, this, keyCoder, valueCoder) + +override def offerCoders(coders: Array[Coder[MapState[K, V]]]): Unit = { + if (this.kCoder == null) { +if (coders(0) != null) { + this.kCoder = coders(0).asInstanceOf[Coder[K]] +} + } + + if (this.vCoder == null) { +if (coders(1) != null) { + this.vCoder =
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123428739 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.util.Objects + +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.api.{BagState, MapState, SetState, ValueState} + +/** + * a utility object for StateSpec + */ +object StateSpecs { + + private class ValueStateSpec[T](coder: Coder[T]) extends StateSpec[ValueState[T]] { + +var aCoder: Coder[T] = coder + +override def bind(id: String, binder: StateBinder): ValueState[T] = { + binder.bindValue(id, this, aCoder) +} + +override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = { + if (this.aCoder == null) { +if (coders(0) != null) { + this.aCoder = coders(0).asInstanceOf[Coder[T]] +} + } +} + +override def finishSpecifying: Unit = { + if (aCoder == null) throw new IllegalStateException( +"Unable to infer a coder for ValueState and no Coder" ++ " was specified. Please set a coder by either invoking" ++ " StateSpecs.value(Coder valueCoder) or by registering the coder in the" ++ " Pipeline's CoderRegistry.") +} + +override def equals(obj: Any): Boolean = { + if (obj == this) true + + if (!(obj.isInstanceOf[ValueStateSpec[T]])) false + + val that: ValueStateSpec[_] = obj.asInstanceOf[ValueStateSpec[_]] + Objects.equals(this.aCoder, that.aCoder) +} + +override def hashCode(): Int = { + Objects.hashCode(this.aCoder) +} + } + + private class BagStateSpec[T](coder: Coder[T]) extends StateSpec[BagState[T]] { + +private implicit var elemCoder = coder + +override def bind(id: String, binder: StateBinder): BagState[T] = + binder.bindBag(id, this, elemCoder) + +override def offerCoders(coders: Array[Coder[BagState[T]]]): Unit = { + if (this.elemCoder == null) { +if (coders(0) != null) { + this.elemCoder = coders(0).asInstanceOf[Coder[T]] +} + } +} + +override def finishSpecifying: Unit = { + if (elemCoder == null) { +throw new IllegalStateException("Unable to infer a coder for BagState and no Coder" + + " was specified. Please set a coder by either invoking" + + " StateSpecs.bag(Coder elemCoder) or by registering the coder in the" + + " Pipeline's CoderRegistry."); + } +} + +override def equals(obj: Any): Boolean = { + if (obj == this) true --- End diff -- The same --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123428752 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StateSpecs.scala --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.util.Objects + +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.api.{BagState, MapState, SetState, ValueState} + +/** + * a utility object for StateSpec + */ +object StateSpecs { + + private class ValueStateSpec[T](coder: Coder[T]) extends StateSpec[ValueState[T]] { + +var aCoder: Coder[T] = coder + +override def bind(id: String, binder: StateBinder): ValueState[T] = { + binder.bindValue(id, this, aCoder) +} + +override def offerCoders(coders: Array[Coder[ValueState[T]]]): Unit = { + if (this.aCoder == null) { +if (coders(0) != null) { + this.aCoder = coders(0).asInstanceOf[Coder[T]] +} + } +} + +override def finishSpecifying: Unit = { + if (aCoder == null) throw new IllegalStateException( +"Unable to infer a coder for ValueState and no Coder" ++ " was specified. Please set a coder by either invoking" ++ " StateSpecs.value(Coder valueCoder) or by registering the coder in the" ++ " Pipeline's CoderRegistry.") +} + +override def equals(obj: Any): Boolean = { + if (obj == this) true + + if (!(obj.isInstanceOf[ValueStateSpec[T]])) false + + val that: ValueStateSpec[_] = obj.asInstanceOf[ValueStateSpec[_]] + Objects.equals(this.aCoder, that.aCoder) +} + +override def hashCode(): Int = { + Objects.hashCode(this.aCoder) +} + } + + private class BagStateSpec[T](coder: Coder[T]) extends StateSpec[BagState[T]] { + +private implicit var elemCoder = coder + +override def bind(id: String, binder: StateBinder): BagState[T] = + binder.bindBag(id, this, elemCoder) + +override def offerCoders(coders: Array[Coder[BagState[T]]]): Unit = { + if (this.elemCoder == null) { +if (coders(0) != null) { + this.elemCoder = coders(0).asInstanceOf[Coder[T]] +} + } +} + +override def finishSpecifying: Unit = { + if (elemCoder == null) { +throw new IllegalStateException("Unable to infer a coder for BagState and no Coder" + + " was specified. Please set a coder by either invoking" + + " StateSpecs.bag(Coder elemCoder) or by registering the coder in the" + + " Pipeline's CoderRegistry."); + } +} + +override def equals(obj: Any): Boolean = { + if (obj == this) true + + if (!obj.isInstanceOf[BagStateSpec[_]]) false + + val that = obj.asInstanceOf[BagStateSpec[_]] + Objects.equals(this.elemCoder, that.elemCoder) +} + +override def hashCode(): Int = Objects.hash(getClass, elemCoder) + } + + private class MapStateSpec[K, V](keyCoder: Coder[K], valueCoder: Coder[V]) +extends StateSpec[MapState[K, V]] { + +private implicit var kCoder = keyCoder +private implicit var vCoder = valueCoder + +override def bind(id: String, binder: StateBinder): MapState[K, V] = + binder.bindMap(id, this, keyCoder, valueCoder) + +override def offerCoders(coders: Array[Coder[MapState[K, V]]]): Unit = { + if (this.kCoder == null) { +if (coders(0) != null) { + this.kCoder = coders(0).asInstanceOf[Coder[K]] +} + } + + if (this.vCoder == null) { +if (coders(1) != null) { + this.vCoder =
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123429442 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.io._ +import java.time.Instant +import java.util +import java.util.Map + +import com.google.common.collect.Table +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.refactor.coder._ +import org.apache.gearpump.streaming.refactor.state.api.StateInternals +import org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFactory +import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} +import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock} +import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory +import org.apache.gearpump.util.LogUtil +import org.apache.gearpump.{Message, TimeStamp} + +object StatefulTask { + val LOG = LogUtil.getLogger(getClass) +} + +/** + * stateful task that support state access and all state will be backed in memory + * after checkpoint state will persist into storage layer and it will guarantee + * 'exactly-once' process semantic + * + */ +abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + + import taskContext._ + + val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory]( +PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get + val checkpointStore = checkpointStoreFactory.getCheckpointStore( +s"app$appId-task${taskId.processorId}_${taskId.index}") + val checkpointInterval = conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get + val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) + + var inited = false + + // core state data + var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] = null + + /** + * subclass should override this method if they want to init state api + */ + def open: Unit = {} + + /** + * Subclass should override this method to specify how a new message should update state + */ + def invoke(message: Message): Unit + + def close: Unit = {} + + def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals = { +if (!inited) { + throw new RuntimeException(" please init state access object in `open` method! ") +} +if (encodedKeyStateMap == null) { + encodedKeyStateMap = new util.HashMap[String, Table[String, String, Array[Byte]]]() +} + +val factory = new HeapStateInternalsFactory[KT](keyCoder, encodedKeyStateMap) +factory.stateInternalsForKey(key) + } + + final override def onStart(startTime: Instant): Unit = { +// recover state from snapshot +LOG.info("[onStart] - recover from snapshot") +val timestamp = startTime.toEpochMilli +checkpointManager + .recover(timestamp) + .foreach(recoverState(timestamp, _)) +reportCheckpointClock(timestamp) +inited = true +open + } + + final override def onNext(message: Message): Unit = { +checkpointManager.update(message.timestamp.toEpochMilli) +invoke(message) + } + + final override def onWatermarkProgress(watermark: Instant): Unit = { +if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) { + checkpointManager.getCheckpointTime.foreach { checkpointTime => +val serialized = snapshot +checkpointManager.checkpoint(checkpointTime, serialized) +reportCheckpointClock(checkpointTime) + } +} + } +
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123430323 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/heap/HeapStateInternals.scala --- @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state.heap + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream} +import java.lang.Iterable +import java.util +import java.util.Map.Entry +import java.util._ +import java.util.Objects + +import com.google.common.collect.Table +import org.apache.gearpump.streaming.refactor.coder.{Coder, ListCoder, MapCoder, SetCoder} +import org.apache.gearpump.streaming.refactor.state.{StateBinder, StateNamespace, StateSpec, StateTag} +import org.apache.gearpump.streaming.refactor.state.api._ +import org.apache.gearpump.util.LogUtil + +/** + * a heap memory backend StateInternals implementation + * it will cache state in heap memory with a Guava's Table structure + * there are three concept : + * (1) namespace mapping table's row id + * (2) tag mapping table's field + * (3) state mapping table's cell value + */ +class HeapStateInternals[K](key: K, stateTable: Table[String, String, Array[Byte]]) + extends StateInternals { + + val LOG = LogUtil.getLogger(getClass) + + private val k: K = key --- End diff -- redundant --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123429491 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/StatefulTask.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.io._ +import java.time.Instant +import java.util +import java.util.Map + +import com.google.common.collect.Table +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.refactor.coder._ +import org.apache.gearpump.streaming.refactor.state.api.StateInternals +import org.apache.gearpump.streaming.refactor.state.heap.HeapStateInternalsFactory +import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} +import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock} +import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory +import org.apache.gearpump.util.LogUtil +import org.apache.gearpump.{Message, TimeStamp} + +object StatefulTask { + val LOG = LogUtil.getLogger(getClass) +} + +/** + * stateful task that support state access and all state will be backed in memory + * after checkpoint state will persist into storage layer and it will guarantee + * 'exactly-once' process semantic + * + */ +abstract class StatefulTask(taskContext: TaskContext, conf: UserConfig) + extends Task(taskContext, conf) { + + import taskContext._ + + val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory]( +PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get + val checkpointStore = checkpointStoreFactory.getCheckpointStore( +s"app$appId-task${taskId.processorId}_${taskId.index}") + val checkpointInterval = conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get + val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) + + var inited = false + + // core state data + var encodedKeyStateMap: Map[String, Table[String, String, Array[Byte]]] = null + + /** + * subclass should override this method if they want to init state api + */ + def open: Unit = {} + + /** + * Subclass should override this method to specify how a new message should update state + */ + def invoke(message: Message): Unit + + def close: Unit = {} + + def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals = { +if (!inited) { + throw new RuntimeException(" please init state access object in `open` method! ") +} +if (encodedKeyStateMap == null) { + encodedKeyStateMap = new util.HashMap[String, Table[String, String, Array[Byte]]]() +} + +val factory = new HeapStateInternalsFactory[KT](keyCoder, encodedKeyStateMap) +factory.stateInternalsForKey(key) + } + + final override def onStart(startTime: Instant): Unit = { +// recover state from snapshot +LOG.info("[onStart] - recover from snapshot") +val timestamp = startTime.toEpochMilli +checkpointManager + .recover(timestamp) + .foreach(recoverState(timestamp, _)) +reportCheckpointClock(timestamp) +inited = true +open + } + + final override def onNext(message: Message): Unit = { +checkpointManager.update(message.timestamp.toEpochMilli) +invoke(message) + } + + final override def onWatermarkProgress(watermark: Instant): Unit = { +if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) { + checkpointManager.getCheckpointTime.foreach { checkpointTime => +val serialized = snapshot +checkpointManager.checkpoint(checkpointTime, serialized) +reportCheckpointClock(checkpointTime) + } +} + } +
[GitHub] incubator-gearpump pull request #190: [Gearpump 311] refactor state manageme...
Github user huafengw commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/190#discussion_r123430243 --- Diff: streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/InMemoryGlobalStateInternals.scala --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.refactor.state + +import java.util +import java.util.Map.Entry +import java.util.{ArrayList, HashSet, List, Set} +import java.lang.Iterable + +import com.google.common.collect.{HashBasedTable, Table} +import org.apache.gearpump.streaming.refactor.coder.Coder +import org.apache.gearpump.streaming.refactor.state.InMemoryGlobalStateInternals.InMemoryStateBinder +import org.apache.gearpump.streaming.refactor.state.api._ + +/** + * a no namespace (global) in memory state internal. + * it is a default implementation inspired by Apache Beam. + * but do not recommend use it! + */ +class InMemoryGlobalStateInternals[K] protected(key: K) extends StateInternals { + + private val k: K = key --- End diff -- Look like `k` is redundant since you already have `key` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---