[jira] [Work logged] (BEAM-2953) Timeseries processing extensions using state API
[ https://issues.apache.org/jira/browse/BEAM-2953?focusedWorklogId=262608&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-262608 ] ASF GitHub Bot logged work on BEAM-2953: Author: ASF GitHub Bot Created on: 18/Jun/19 20:48 Start Date: 18/Jun/19 20:48 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #6540: [BEAM-2953] Timeseries extensions . URL: https://github.com/apache/beam/pull/6540#issuecomment-503305681 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 262608) Time Spent: 4.5h (was: 4h 20m) > Timeseries processing extensions using state API > > > Key: BEAM-2953 > URL: https://issues.apache.org/jira/browse/BEAM-2953 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.7.0 >Reporter: Reza ardeshir rokni >Priority: Minor > Time Spent: 4.5h > Remaining Estimate: 0h > > A general set of timeseries transforms that abstract the user from the > process of dealing with some of the common problems when dealing with > timeseries using BEAM (in stream or batch mode). > BEAM can be used to build out some very interesting pre-processing stages for > time series data. Some examples that will be useful: > - Downsampling time series based on simple MIN, MAX, COUNT, SUM, LAST, FIRST > - Creating a value for each downsampled window even if no value has been > emitted for the specific key. > - Loading the value of a downsample with the previous value (used in FX with > previous close being brought into current open value) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2953) Timeseries processing extensions using state API
[ https://issues.apache.org/jira/browse/BEAM-2953?focusedWorklogId=262611&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-262611 ] ASF GitHub Bot logged work on BEAM-2953: Author: ASF GitHub Bot Created on: 18/Jun/19 20:48 Start Date: 18/Jun/19 20:48 Worklog Time Spent: 10m Work Description: stale[bot] commented on pull request #6540: [BEAM-2953] Timeseries extensions . URL: https://github.com/apache/beam/pull/6540 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 262611) Time Spent: 4h 40m (was: 4.5h) > Timeseries processing extensions using state API > > > Key: BEAM-2953 > URL: https://issues.apache.org/jira/browse/BEAM-2953 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.7.0 >Reporter: Reza ardeshir rokni >Priority: Minor > Time Spent: 4h 40m > Remaining Estimate: 0h > > A general set of timeseries transforms that abstract the user from the > process of dealing with some of the common problems when dealing with > timeseries using BEAM (in stream or batch mode). > BEAM can be used to build out some very interesting pre-processing stages for > time series data. Some examples that will be useful: > - Downsampling time series based on simple MIN, MAX, COUNT, SUM, LAST, FIRST > - Creating a value for each downsampled window even if no value has been > emitted for the specific key. > - Loading the value of a downsample with the previous value (used in FX with > previous close being brought into current open value) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2953) Timeseries processing extensions using state API
[ https://issues.apache.org/jira/browse/BEAM-2953?focusedWorklogId=258074&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-258074 ] ASF GitHub Bot logged work on BEAM-2953: Author: ASF GitHub Bot Created on: 11/Jun/19 20:00 Start Date: 11/Jun/19 20:00 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #6540: [BEAM-2953] Timeseries extensions . URL: https://github.com/apache/beam/pull/6540#issuecomment-501000256 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 258074) Time Spent: 4h 20m (was: 4h 10m) > Timeseries processing extensions using state API > > > Key: BEAM-2953 > URL: https://issues.apache.org/jira/browse/BEAM-2953 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.7.0 >Reporter: Reza ardeshir rokni >Priority: Minor > Time Spent: 4h 20m > Remaining Estimate: 0h > > A general set of timeseries transforms that abstract the user from the > process of dealing with some of the common problems when dealing with > timeseries using BEAM (in stream or batch mode). > BEAM can be used to build out some very interesting pre-processing stages for > time series data. Some examples that will be useful: > - Downsampling time series based on simple MIN, MAX, COUNT, SUM, LAST, FIRST > - Creating a value for each downsampled window even if no value has been > emitted for the specific key. > - Loading the value of a downsample with the previous value (used in FX with > previous close being brought into current open value) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2953) Timeseries processing extensions using state API
[ https://issues.apache.org/jira/browse/BEAM-2953?focusedWorklogId=155678&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155678 ] ASF GitHub Bot logged work on BEAM-2953: Author: ASF GitHub Bot Created on: 18/Oct/18 02:22 Start Date: 18/Oct/18 02:22 Worklog Time Spent: 10m Work Description: rezarokni commented on a change in pull request #6540: [BEAM-2953] Advanced Timeseries examples. URL: https://github.com/apache/beam/pull/6540#discussion_r226150860 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/io/tf/TFExampleToBytes.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.timeseries.io.tf; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.tensorflow.example.Example; + +/** Convert TensorFlow Example object to byte[]. */ +@Experimental +public class TFExampleToBytes extends DoFn { Review comment: This was mainly used for pushing data into TFRecords, however in the last commit I have modified the code so that rather than use a transform I make use of a lambda function which calls the getBytes: https://github.com/rezarokni/beam/blob/timeseries/examples/java/src/main/java/org/apache/beam/examples/timeseries/TimeSeriesExampleToFile.java https://github.com/rezarokni/beam/blob/timeseries/sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/utils/TSAccumSequences.java#L50 So I think given this refactor I should be able to just delete these ToBytes transforms... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 155678) Time Spent: 4h 10m (was: 4h) > Timeseries processing extensions using state API > > > Key: BEAM-2953 > URL: https://issues.apache.org/jira/browse/BEAM-2953 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.7.0 >Reporter: Reza ardeshir rokni >Assignee: Reuven Lax >Priority: Minor > Time Spent: 4h 10m > Remaining Estimate: 0h > > A general set of timeseries transforms that abstract the user from the > process of dealing with some of the common problems when dealing with > timeseries using BEAM (in stream or batch mode). > BEAM can be used to build out some very interesting pre-processing stages for > time series data. Some examples that will be useful: > - Downsampling time series based on simple MIN, MAX, COUNT, SUM, LAST, FIRST > - Creating a value for each downsampled window even if no value has been > emitted for the specific key. > - Loading the value of a downsample with the previous value (used in FX with > previous close being brought into current open value) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2953) Timeseries processing extensions using state API
[ https://issues.apache.org/jira/browse/BEAM-2953?focusedWorklogId=155677&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155677 ] ASF GitHub Bot logged work on BEAM-2953: Author: ASF GitHub Bot Created on: 18/Oct/18 02:07 Start Date: 18/Oct/18 02:07 Worklog Time Spent: 10m Work Description: rezarokni commented on a change in pull request #6540: [BEAM-2953] Advanced Timeseries examples. URL: https://github.com/apache/beam/pull/6540#discussion_r226149066 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/transforms/GetValueFromKV.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.timeseries.transforms; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; + +/** + * Extract the value from a KV. + * + * @param + */ +@Experimental +public class GetValueFromKV extends DoFn, T> { Review comment: Perfectly ! :-) Will change over to using the inbuilt transform. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 155677) Time Spent: 4h (was: 3h 50m) > Timeseries processing extensions using state API > > > Key: BEAM-2953 > URL: https://issues.apache.org/jira/browse/BEAM-2953 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.7.0 >Reporter: Reza ardeshir rokni >Assignee: Reuven Lax >Priority: Minor > Time Spent: 4h > Remaining Estimate: 0h > > A general set of timeseries transforms that abstract the user from the > process of dealing with some of the common problems when dealing with > timeseries using BEAM (in stream or batch mode). > BEAM can be used to build out some very interesting pre-processing stages for > time series data. Some examples that will be useful: > - Downsampling time series based on simple MIN, MAX, COUNT, SUM, LAST, FIRST > - Creating a value for each downsampled window even if no value has been > emitted for the specific key. > - Loading the value of a downsample with the previous value (used in FX with > previous close being brought into current open value) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2953) Timeseries processing extensions using state API
[ https://issues.apache.org/jira/browse/BEAM-2953?focusedWorklogId=155659&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155659 ] ASF GitHub Bot logged work on BEAM-2953: Author: ASF GitHub Bot Created on: 18/Oct/18 00:24 Start Date: 18/Oct/18 00:24 Worklog Time Spent: 10m Work Description: akedin commented on a change in pull request #6540: [BEAM-2953] Advanced Timeseries examples. URL: https://github.com/apache/beam/pull/6540#discussion_r226136139 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/transforms/GetValueFromKV.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.timeseries.transforms; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; + +/** + * Extract the value from a KV. + * + * @param + */ +@Experimental +public class GetValueFromKV extends DoFn, T> { Review comment: Wouldn't [this](https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java#L43) work? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 155659) Time Spent: 3h 50m (was: 3h 40m) > Timeseries processing extensions using state API > > > Key: BEAM-2953 > URL: https://issues.apache.org/jira/browse/BEAM-2953 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.7.0 >Reporter: Reza ardeshir rokni >Assignee: Reuven Lax >Priority: Minor > Time Spent: 3h 50m > Remaining Estimate: 0h > > A general set of timeseries transforms that abstract the user from the > process of dealing with some of the common problems when dealing with > timeseries using BEAM (in stream or batch mode). > BEAM can be used to build out some very interesting pre-processing stages for > time series data. Some examples that will be useful: > - Downsampling time series based on simple MIN, MAX, COUNT, SUM, LAST, FIRST > - Creating a value for each downsampled window even if no value has been > emitted for the specific key. > - Loading the value of a downsample with the previous value (used in FX with > previous close being brought into current open value) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2953) Timeseries processing extensions using state API
[ https://issues.apache.org/jira/browse/BEAM-2953?focusedWorklogId=155658&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155658 ] ASF GitHub Bot logged work on BEAM-2953: Author: ASF GitHub Bot Created on: 18/Oct/18 00:16 Start Date: 18/Oct/18 00:16 Worklog Time Spent: 10m Work Description: akedin commented on a change in pull request #6540: [BEAM-2953] Advanced Timeseries examples. URL: https://github.com/apache/beam/pull/6540#discussion_r226135037 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/io/tf/TFExampleToBytes.java ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.timeseries.io.tf; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.tensorflow.example.Example; + +/** Convert TensorFlow Example object to byte[]. */ +@Experimental +public class TFExampleToBytes extends DoFn { Review comment: Can you share an example where you use these `TFExample*` `DoFns`? Beam elements are all technically byte arrays, and Beam has Coders to serialize and deserialize elements to/from bytes. And ultimately the IOs should understand how to do that. If these were some custom classes for which you needed to implement custom serialization for then it would probably make sense to implement custom coders. But these specific classes are subclasses of protobuf messages, which Beam should have good support for (e.g. there's `ProtoCoder` ([link](https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java)) that's used in GCP IOs, and there's `CoderUtils` [stuff](https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java#L50) that can probably be used in conjunction). So my intuition is that it should be possible to use some generic solution that already exists in Beam to implement byte conversion, instead of creating custom classes for specific use case. Maybe I'm wrong here, and usage examples (and/or tests) would help understand this part This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 155658) Time Spent: 3h 40m (was: 3.5h) > Timeseries processing extensions using state API > > > Key: BEAM-2953 > URL: https://issues.apache.org/jira/browse/BEAM-2953 > Project: Beam > Issue Type: Improvement > Components: sdk-ideas >Affects Versions: 2.7.0 >Reporter: Reza ardeshir rokni >Assignee: Reuven Lax >Priority: Minor > Time Spent: 3h 40m > Remaining Estimate: 0h > > A general set of timeseries transforms that abstract the user from the > process of dealing with some of the common problems when dealing with > timeseries using BEAM (in stream or batch mode). > BEAM can be used to build out some very interesting pre-processing stages for > time series data. Some examples that will be useful: > - Downsampling time series based on simple MIN, MAX, COUNT, SUM, LAST, FIRST > - Creating a value for each downsampled window even if no value has been > emitted for the specific key. > - Loading the value of a downsample with the previous value (used in FX with > previous close being brought into current open value) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-2953) Timeseries processing extensions using state API
[ https://issues.apache.org/jira/browse/BEAM-2953?focusedWorklogId=155622&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155622 ] ASF GitHub Bot logged work on BEAM-2953: Author: ASF GitHub Bot Created on: 17/Oct/18 21:51 Start Date: 17/Oct/18 21:51 Worklog Time Spent: 10m Work Description: rezarokni commented on a change in pull request #6540: [BEAM-2953] Advanced Timeseries examples. URL: https://github.com/apache/beam/pull/6540#discussion_r226107255 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/transforms/OrderOutput.java ## @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.timeseries.transforms; + +import com.google.common.collect.Lists; +import com.google.protobuf.util.Durations; +import com.google.protobuf.util.Timestamps; +import java.util.Iterator; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; +import org.apache.beam.sdk.extensions.timeseries.TimeSeriesOptions; +import org.apache.beam.sdk.extensions.timeseries.configuration.TSConfiguration; +import org.apache.beam.sdk.extensions.timeseries.protos.TimeSeriesData; +import org.apache.beam.sdk.extensions.timeseries.utils.TSAccums; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.*; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Create ordered output from the fixed windowed aggregations. */ +@SuppressWarnings("serial") +@Experimental +public class OrderOutput +extends PTransform< +PCollection>, +PCollection>> { + + private static final Logger LOG = LoggerFactory.getLogger(OrderOutput.class); + + @Override + public PCollection> expand( + PCollection> input) { + +TSConfiguration options = +TSConfiguration.createConfigurationFromOptions( +input.getPipeline().getOptions().as(TimeSeriesOptions.class)); + +// Move into Global Time Domain, this allows Keyed State to retain its value across windows. +// Late Data is dropped at this stage. + +PCollection> windowNoLateData = +input.apply( +"Global Window", +Window.>into(new GlobalWindows()) +.withAllowedLateness(Duration.ZERO)); + +return windowNoLateData +.apply(ParDo.of(new GetPreviousData(options))) +.apply( +"Re Window post Global", +Window.>into( +FixedWindows.of(options.downSampleDuration())) +// TODO: DirectRunner not showing results with exact late date match + //.withAllowedLateness(options.downSampleDuration().plus(options.downSampleDuration())) +.withAllowedLateness(Duration.standardDays(1)) +.discardingFiredPanes()); + } + + /** + * When a new key is seen (state == null) for the first time, we will create a timer to fire in + * the next window boundary. If this is not the first time the key is seen we check the ttl to see + * if a new timer is required. In-between timers firing, we will add all new elements to a List. + * lets have 3 elements coming in at various time and then there is NULL for forth time slice + * [t1,t2,t3, NULL] t1 arrives and we set timer to fire at t1.plus(downsample duration + fixed + * offset). + * + * Then we have state.set(t1). t2 arriv
[jira] [Work logged] (BEAM-2953) Timeseries processing extensions using state API
[ https://issues.apache.org/jira/browse/BEAM-2953?focusedWorklogId=155610&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-155610 ] ASF GitHub Bot logged work on BEAM-2953: Author: ASF GitHub Bot Created on: 17/Oct/18 21:08 Start Date: 17/Oct/18 21:08 Worklog Time Spent: 10m Work Description: akedin commented on a change in pull request #6540: [BEAM-2953] Advanced Timeseries examples. URL: https://github.com/apache/beam/pull/6540#discussion_r226095495 ## File path: sdks/java/extensions/timeseries/src/main/java/org/apache/beam/sdk/extensions/timeseries/transforms/OrderOutput.java ## @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.extensions.timeseries.transforms; + +import com.google.common.collect.Lists; +import com.google.protobuf.util.Durations; +import com.google.protobuf.util.Timestamps; +import java.util.Iterator; +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; +import org.apache.beam.sdk.extensions.timeseries.TimeSeriesOptions; +import org.apache.beam.sdk.extensions.timeseries.configuration.TSConfiguration; +import org.apache.beam.sdk.extensions.timeseries.protos.TimeSeriesData; +import org.apache.beam.sdk.extensions.timeseries.utils.TSAccums; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.*; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Create ordered output from the fixed windowed aggregations. */ +@SuppressWarnings("serial") +@Experimental +public class OrderOutput +extends PTransform< +PCollection>, +PCollection>> { + + private static final Logger LOG = LoggerFactory.getLogger(OrderOutput.class); + + @Override + public PCollection> expand( + PCollection> input) { + +TSConfiguration options = +TSConfiguration.createConfigurationFromOptions( +input.getPipeline().getOptions().as(TimeSeriesOptions.class)); + +// Move into Global Time Domain, this allows Keyed State to retain its value across windows. +// Late Data is dropped at this stage. + +PCollection> windowNoLateData = +input.apply( +"Global Window", +Window.>into(new GlobalWindows()) +.withAllowedLateness(Duration.ZERO)); + +return windowNoLateData +.apply(ParDo.of(new GetPreviousData(options))) +.apply( +"Re Window post Global", +Window.>into( +FixedWindows.of(options.downSampleDuration())) +// TODO: DirectRunner not showing results with exact late date match + //.withAllowedLateness(options.downSampleDuration().plus(options.downSampleDuration())) +.withAllowedLateness(Duration.standardDays(1)) +.discardingFiredPanes()); + } + + /** + * When a new key is seen (state == null) for the first time, we will create a timer to fire in + * the next window boundary. If this is not the first time the key is seen we check the ttl to see + * if a new timer is required. In-between timers firing, we will add all new elements to a List. + * lets have 3 elements coming in at various time and then there is NULL for forth time slice + * [t1,t2,t3, NULL] t1 arrives and we set timer to fire at t1.plus(downsample duration + fixed + * offset). + * + * Then we have state.set(t1). t2 arrives