[jira] [Updated] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB
[ https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-9373: -- Description: Currently, when using RocksIterator we only use the _iterator.isValid()_ to check whether we have reached the end of the iterator. But that is not enough, if we refer to RocksDB's wiki https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should find that even if _iterator.isValid()=true_, there may also exist some internal error. A safer way to use the _RocksIterator_ is to always call the _iterator.status()_ to check the internal error of _RocksDB_. There is a case from user email seems to lost data because of this http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html (was: Currently, when using RocksIterator we only use the _iterator.isValid()_ to check whether we have reached the end of the iterator. But that is not enough, if we refer to RocksDB's wiki https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should find that _iterator.isValid()_ may also cause by a internal error. A safer way to use the _RocksIterator_ is to always call the _iterator.status()_ to check the internal error of _RocksDB_. There is a case from user email seems to lost data because of this http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html) > Always call RocksIterator.status() to check the internal error of RocksDB > - > > Key: FLINK-9373 > URL: https://issues.apache.org/jira/browse/FLINK-9373 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > > Currently, when using RocksIterator we only use the _iterator.isValid()_ to > check whether we have reached the end of the iterator. But that is not > enough, if we refer to RocksDB's wiki > https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should > find that even if _iterator.isValid()=true_, there may also exist some > internal error. A safer way to use the _RocksIterator_ is to always call the > _iterator.status()_ to check the internal error of _RocksDB_. There is a case > from user email seems to lost data because of this > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB
[ https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou reopened FLINK-9373: --- > Always call RocksIterator.status() to check the internal error of RocksDB > - > > Key: FLINK-9373 > URL: https://issues.apache.org/jira/browse/FLINK-9373 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > > Currently, when using RocksIterator we only use the _iterator.isValid()_ to > check whether we have reached the end of the iterator. But that is not > enough, if we refer to RocksDB's wiki > https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should > find that _iterator.isValid()_ may also cause by a internal error. A safer > way to use the _RocksIterator_ is to always call the _iterator.status()_ to > check the internal error of _RocksDB_. There is a case from user email seems > to lost data because of this > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB
[ https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou closed FLINK-9373. - Resolution: Invalid > Always call RocksIterator.status() to check the internal error of RocksDB > - > > Key: FLINK-9373 > URL: https://issues.apache.org/jira/browse/FLINK-9373 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.5.0 >Reporter: Sihua Zhou >Assignee: Sihua Zhou >Priority: Blocker > > Currently, when using RocksIterator we only use the _iterator.isValid()_ to > check whether we have reached the end of the iterator. But that is not > enough, if we refer to RocksDB's wiki > https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should > find that _iterator.isValid()_ may also cause by a internal error. A safer > way to use the _RocksIterator_ is to always call the _iterator.status()_ to > check the internal error of _RocksDB_. There is a case from user email seems > to lost data because of this > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB
Sihua Zhou created FLINK-9373: - Summary: Always call RocksIterator.status() to check the internal error of RocksDB Key: FLINK-9373 URL: https://issues.apache.org/jira/browse/FLINK-9373 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.5.0 Reporter: Sihua Zhou Assignee: Sihua Zhou Currently, when using RocksIterator we only use the _iterator.isValid()_ to check whether we have reached the end of the iterator. But that is not enough, if we refer to RocksDB's wiki https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should find that _iterator.isValid()_ may also cause by a internal error. A safer way to use the _RocksIterator_ is to always call the _iterator.status()_ to check the internal error of _RocksDB_. There is a case from user email seems to lost data because of this http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9292) Remove TypeInfoParser
[ https://issues.apache.org/jira/browse/FLINK-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476930#comment-16476930 ] ASF GitHub Bot commented on FLINK-9292: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5970 cc @tzulitai > Remove TypeInfoParser > - > > Key: FLINK-9292 > URL: https://issues.apache.org/jira/browse/FLINK-9292 > Project: Flink > Issue Type: Task > Components: Core >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > > The {{TypeInfoParser}} has been deprecated, in favor of the {{TypeHint}}. > Because the TypeInfoParser is also not working correctly with respect to > classloading, we should remove it. Users still find the class, try to use it, > and run into problems. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5970 cc @tzulitai ---
[jira] [Commented] (FLINK-9299) ProcessWindowFunction documentation Java examples have errors
[ https://issues.apache.org/jira/browse/FLINK-9299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476927#comment-16476927 ] ASF GitHub Bot commented on FLINK-9299: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6001 cc @tzulitai @fhueske > ProcessWindowFunction documentation Java examples have errors > - > > Key: FLINK-9299 > URL: https://issues.apache.org/jira/browse/FLINK-9299 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.2 >Reporter: Ken Krugler >Assignee: vinoyang >Priority: Minor > > In looking at > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation], > I noticed a few errors... > * "This allows to incrementally compute windows" should be "This allows it > to incrementally compute windows" > * DataStream input = ...; should be > DataStream> input = ...; > * The getResult() method needs to cast one of the accumulator values to a > double, if that's what it is going to return. > * MyProcessWindowFunction needs to extend, not implement > ProcessWindowFunction > * MyProcessWindowFunction needs to implement a process() method, not an > apply() method. > * The call to .timeWindow takes a Time parameter, not a window assigner. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6001: [FLINK-9299] ProcessWindowFunction documentation Java exa...
Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6001 cc @tzulitai @fhueske ---
[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.
[ https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476908#comment-16476908 ] ASF GitHub Bot commented on FLINK-8659: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188511258 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2 testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteri
[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.
[ https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476909#comment-16476909 ] ASF GitHub Bot commented on FLINK-8659: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188512862 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2 testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteri
[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.
[ https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476905#comment-16476905 ] ASF GitHub Bot commented on FLINK-8659: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188511081 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2 testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteri
[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.
[ https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476910#comment-16476910 ] ASF GitHub Bot commented on FLINK-8659: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188512949 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2 testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteri
[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.
[ https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476907#comment-16476907 ] ASF GitHub Bot commented on FLINK-8659: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188512804 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2 testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteri
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188511081 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2 testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (testStateBackend) { + case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBa
[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.
[ https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476906#comment-16476906 ] ASF GitHub Bot commented on FLINK-8659: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188511280 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2 testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteri
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188512949 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2 testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (testStateBackend) { + case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBa
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188511258 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2 testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (testStateBackend) { + case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBa
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188512862 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2 testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (testStateBackend) { + case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBa
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188512804 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2 testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (testStateBackend) { + case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBa
[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5955#discussion_r188511280 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java --- @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.BroadcastStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Migration ITCases for a stateful job with broadcast state. The tests are parameterized to (potentially) + * cover migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobWBroadcastStateMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final StatefulJobSavepointMigrationITCase.ExecutionMode executionMode = + StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_5, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobWBroadcastStateMigrationITCase(Tuple2 testMigrateVersionAndBackend) throws Exception { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (testStateBackend) { + case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBa
[jira] [Commented] (FLINK-9182) async checkpoints for timer service
[ https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476884#comment-16476884 ] ASF GitHub Bot commented on FLINK-9182: --- GitHub user makeyang opened a pull request: https://github.com/apache/flink/pull/6019 [FLINK-9182]async checkpoints for timer service ## What is the purpose of the change it is for async checkpoints for timer service the whole idea is based on discussion in previous PR for FLINK-9182 in this link:https://github.com/apache/flink/pull/5908 ## Brief change log in sync part flat copy of the internal array of the priority queue in async part build key group and write timer key group ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/makeyang/flink FLINK-9182-version2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6019.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6019 commit 82799922203bd6cb959c11336f71aee4def431d7 Author: makeyang Date: 2018-05-16T05:44:16Z [FLINK-9182]async checkpoints for timer service the whole idea is based on discussion on github: https://github.com/apache/flink/pull/5908 the idea is propesed by StefanRRichter as below: "Second, I would probably suggest a simpler model for the async snapshots. You dropped the idea of making flat copies, but I wonder if this was premature. I can see that calling set.toArray(...) per keygroup could (maybe) turn out a bit slow because it has to potentially iterate and flatten linked entries. However, with async snapshots, we could get rid of the key-group partitioning of sets, and instead do a flat copy of the internal array of the priority queue. This would translate to just a single memcopy call internally, which is very efficient. In the async part, we can still partition the timers by key-group in a similar way as the copy-on-write state table does. This would avoid slowing down the event processing path (in fact improving it be unifying the sets) and also keep the approach very straight forward and less invasive." > async checkpoints for timer service > --- > > Key: FLINK-9182 > URL: https://issues.apache.org/jira/browse/FLINK-9182 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.4.2 >Reporter: makeyang >Assignee: makeyang >Priority: Minor > Fix For: 1.4.3, 1.5.1 > > > # problem description: > ## with the increase in the number of 'InternalTimer' object the checkpoint > more and more slowly > # improvement desgin > ## maintain a stateTableVersion, which is exactly the same thing as > CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as > CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a > readwrite lock, which is used to protect snapshotVersions and > stateTableVersion > ## for each InternalTimer, add 2 more properties: create version and delete > version beside 3 existing properties: timestamp, key and namespace. each time > a Timer is registered in timerservice, it is created with stateTableVersion > as its create version while delete version is -1. each time when timer is > deleted in timerservice, it is marked delete for giving it a delete verison > equals to stateTableVersion without physically delete it from timerservice. > ## each time when try to snapshot timers, InternalTimeServiceManager > increase its stateTableVersion and add this stateTableVersion in > snapshotVersions. these 2 operators are protected by write lock of > InternalTimeServiceManager. that current stateTableVersion take as snapshot > version of this snapshot > ## shallow copy tuples > ## then use a another thread asynchronous snapshot whole things: > keyserialized, namespaceserializer and timers. for timers which is not > deleted(delete version is -1) and create version less than snapshot version, > serialized it. for timers whose de
[GitHub] flink pull request #6019: [FLINK-9182]async checkpoints for timer service
GitHub user makeyang opened a pull request: https://github.com/apache/flink/pull/6019 [FLINK-9182]async checkpoints for timer service ## What is the purpose of the change it is for async checkpoints for timer service the whole idea is based on discussion in previous PR for FLINK-9182 in this link:https://github.com/apache/flink/pull/5908 ## Brief change log in sync part flat copy of the internal array of the priority queue in async part build key group and write timer key group ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/makeyang/flink FLINK-9182-version2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6019.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6019 commit 82799922203bd6cb959c11336f71aee4def431d7 Author: makeyang Date: 2018-05-16T05:44:16Z [FLINK-9182]async checkpoints for timer service the whole idea is based on discussion on github: https://github.com/apache/flink/pull/5908 the idea is propesed by StefanRRichter as below: "Second, I would probably suggest a simpler model for the async snapshots. You dropped the idea of making flat copies, but I wonder if this was premature. I can see that calling set.toArray(...) per keygroup could (maybe) turn out a bit slow because it has to potentially iterate and flatten linked entries. However, with async snapshots, we could get rid of the key-group partitioning of sets, and instead do a flat copy of the internal array of the priority queue. This would translate to just a single memcopy call internally, which is very efficient. In the async part, we can still partition the timers by key-group in a similar way as the copy-on-write state table does. This would avoid slowing down the event processing path (in fact improving it be unifying the sets) and also keep the approach very straight forward and less invasive." ---
[jira] [Commented] (FLINK-9354) print execution times for end-to-end tests
[ https://issues.apache.org/jira/browse/FLINK-9354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476866#comment-16476866 ] ASF GitHub Bot commented on FLINK-9354: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6008#discussion_r188507930 --- Diff: flink-end-to-end-tests/run-nightly-tests.sh --- @@ -43,121 +43,151 @@ EXIT_CODE=0 # printf "\n==\n" # printf "Running my fancy nightly end-to-end test\n" # printf "==\n" +# start_timer # $END_TO_END_DIR/test-scripts/test_something_very_fancy.sh # EXIT_CODE=$? +# end_timer # fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" printf "Running HA end-to-end test\n" printf "==\n" +start_timer $END_TO_END_DIR/test-scripts/test_ha.sh EXIT_CODE=$? +end_timer fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" printf "Running Resuming Savepoint (file, async, no parallelism change) end-to-end test\n" printf "==\n" + start_timer STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 EXIT_CODE=$? + end_timer fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" printf "Running Resuming Savepoint (file, sync, no parallelism change) end-to-end test\n" printf "==\n" + start_timer STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 EXIT_CODE=$? + end_timer fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" printf "Running Resuming Savepoint (file, async, scale up) end-to-end test\n" printf "==\n" + start_timer STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 EXIT_CODE=$? + end_timer fi --- End diff -- Perhaps we can refactor this whole block into a common base script for the nightly / pre-commit hook scripts. The end_timer / start_timer functions should also be located there. > print execution times for end-to-end tests > -- > > Key: FLINK-9354 > URL: https://issues.apache.org/jira/browse/FLINK-9354 > Project: Flink > Issue Type: Improvement > Components: Tests, Travis >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.0 > > > We need to modify the end-to-end scripts to include the time it takes for a > test to run. > We currently don't have any clue how long a test actually runs for. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6008: [FLINK-9354][travis] Print execution times for nig...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6008#discussion_r188507930 --- Diff: flink-end-to-end-tests/run-nightly-tests.sh --- @@ -43,121 +43,151 @@ EXIT_CODE=0 # printf "\n==\n" # printf "Running my fancy nightly end-to-end test\n" # printf "==\n" +# start_timer # $END_TO_END_DIR/test-scripts/test_something_very_fancy.sh # EXIT_CODE=$? +# end_timer # fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" printf "Running HA end-to-end test\n" printf "==\n" +start_timer $END_TO_END_DIR/test-scripts/test_ha.sh EXIT_CODE=$? +end_timer fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" printf "Running Resuming Savepoint (file, async, no parallelism change) end-to-end test\n" printf "==\n" + start_timer STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 EXIT_CODE=$? + end_timer fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" printf "Running Resuming Savepoint (file, sync, no parallelism change) end-to-end test\n" printf "==\n" + start_timer STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 EXIT_CODE=$? + end_timer fi if [ $EXIT_CODE == 0 ]; then printf "\n==\n" printf "Running Resuming Savepoint (file, async, scale up) end-to-end test\n" printf "==\n" + start_timer STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 EXIT_CODE=$? + end_timer fi --- End diff -- Perhaps we can refactor this whole block into a common base script for the nightly / pre-commit hook scripts. The end_timer / start_timer functions should also be located there. ---
[jira] [Commented] (FLINK-9354) print execution times for end-to-end tests
[ https://issues.apache.org/jira/browse/FLINK-9354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476864#comment-16476864 ] ASF GitHub Bot commented on FLINK-9354: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6008 I see these in the Travis logs: ``` flink-end-to-end-tests/run-pre-commit-tests.sh: line 94: start_timer: command not found ... flink-end-to-end-tests/run-pre-commit-tests.sh: line 97: end_timer: command not found ``` I think you'll have to include `common.sh` in the nightly / pre-commit hook scripts. > print execution times for end-to-end tests > -- > > Key: FLINK-9354 > URL: https://issues.apache.org/jira/browse/FLINK-9354 > Project: Flink > Issue Type: Improvement > Components: Tests, Travis >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.5.0 > > > We need to modify the end-to-end scripts to include the time it takes for a > test to run. > We currently don't have any clue how long a test actually runs for. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6008: [FLINK-9354][travis] Print execution times for nightly E2...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6008 I see these in the Travis logs: ``` flink-end-to-end-tests/run-pre-commit-tests.sh: line 94: start_timer: command not found ... flink-end-to-end-tests/run-pre-commit-tests.sh: line 97: end_timer: command not found ``` I think you'll have to include `common.sh` in the nightly / pre-commit hook scripts. ---
[GitHub] flink issue #6018: [FLINK-9372] Typo on Elasticsearch website link (elastic....
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6018 Thanks for catching this @medcv! Apparently elastic.io directs to a completely different website ð . Merging this .. ---
[jira] [Commented] (FLINK-9372) Typo on Elasticsearch website link (elastic.io --> elastic.co)
[ https://issues.apache.org/jira/browse/FLINK-9372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476855#comment-16476855 ] ASF GitHub Bot commented on FLINK-9372: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/6018 Thanks for catching this @medcv! Apparently elastic.io directs to a completely different website 😅. Merging this .. > Typo on Elasticsearch website link (elastic.io --> elastic.co) > -- > > Key: FLINK-9372 > URL: https://issues.apache.org/jira/browse/FLINK-9372 > Project: Flink > Issue Type: Improvement > Components: Documentation, ElasticSearch Connector >Affects Versions: 1.5.0, 1.4.1, 1.4.2, 1.5.1 >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Minor > > Typo on website link in Elasticsearch Java Docs (elastic.io --> elastic.co) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-7814) Add BETWEEN expression to Table API
[ https://issues.apache.org/jira/browse/FLINK-7814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ruidong Li reassigned FLINK-7814: - Assignee: Ruidong Li > Add BETWEEN expression to Table API > > > Key: FLINK-7814 > URL: https://issues.apache.org/jira/browse/FLINK-7814 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Ruidong Li >Priority: Minor > > * The Table API does not have a BETWEEN expression. BETWEEN is quite handy > when defining join predicates for window joins. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #6007: [FLINK-8518][Table API & SQL] Support DOW for EXTRACT
Github user snuyanzin commented on the issue: https://github.com/apache/flink/pull/6007 yes, done ---
[jira] [Commented] (FLINK-8518) Support DOW, EPOCH, DECADE for EXTRACT
[ https://issues.apache.org/jira/browse/FLINK-8518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476801#comment-16476801 ] ASF GitHub Bot commented on FLINK-8518: --- Github user snuyanzin commented on the issue: https://github.com/apache/flink/pull/6007 yes, done > Support DOW, EPOCH, DECADE for EXTRACT > -- > > Key: FLINK-8518 > URL: https://issues.apache.org/jira/browse/FLINK-8518 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Sergey Nuyanzin >Priority: Major > > We upgraded Calcite to version 1.15 in FLINK-7934. The EXTRACT method > supports more conversion targets. The targets DOW, EPOCH, DECADE should be > implemented and tested for different datatypes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476733#comment-16476733 ] ASF GitHub Bot commented on FLINK-9008: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188486521 --- Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala --- @@ -34,7 +34,6 @@ import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.metrics.MetricRegistryImpl --- End diff -- It already separate a hotfix. We have two commits here. > End-to-end test: Quickstarts > > > Key: FLINK-9008 > URL: https://issues.apache.org/jira/browse/FLINK-9008 > Project: Flink > Issue Type: Sub-task > Components: Quickstarts, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: mingleizhang >Priority: Blocker > Fix For: 1.6.0 > > > We could add an end-to-end test which verifies Flink's quickstarts. It should > do the following: > # create a new Flink project using the quickstarts archetype > # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or > library) > # run {{mvn clean package -Pbuild-jar}} > # verify that no core dependencies are contained in the jar file > # Run the program -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188486521 --- Diff: flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala --- @@ -34,7 +34,6 @@ import org.apache.flink.runtime.instance.InstanceManager import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} import org.apache.flink.runtime.jobmanager.{JobManager, SubmittedJobGraphStore} import org.apache.flink.runtime.leaderelection.LeaderElectionService -import org.apache.flink.runtime.metrics.MetricRegistryImpl --- End diff -- It already separate a hotfix. We have two commits here. ---
[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476730#comment-16476730 ] ASF GitHub Bot commented on FLINK-9008: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188486388 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR + +mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink\ +-DarchetypeArtifactId=flink-quickstart-java\ +-DarchetypeVersion=1.5-SNAPSHOT\ --- End diff -- Yes. Will change. > End-to-end test: Quickstarts > > > Key: FLINK-9008 > URL: https://issues.apache.org/jira/browse/FLINK-9008 > Project: Flink > Issue Type: Sub-task > Components: Quickstarts, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: mingleizhang >Priority: Blocker > Fix For: 1.6.0 > > > We could add an end-to-end test which verifies Flink's quickstarts. It should > do the following: > # create a new Flink project using the quickstarts archetype > # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or > library) > # run {{mvn clean package -Pbuild-jar}} > # verify that no core dependencies are contained in the jar file > # Run the program -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476731#comment-16476731 ] ASF GitHub Bot commented on FLINK-9008: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188486404 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR + +mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink\ +-DarchetypeArtifactId=flink-quickstart-java\ +-DarchetypeVersion=1.5-SNAPSHOT\ +-DgroupId=org.apache.flink.quickstart \ +-DartifactId=flink-java-project\ +-Dversion=0.1 \ +-Dpackage=org.apache.flink.quickstart \ +-DinteractiveMode=false + +cd flink-java-project + +cp $CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/ + +position=$(awk '// {print NR}' pom.xml | head -1) + +sed -i -e ''"$(($position + 1))"'i\ +\ +org.apache.flink\ +flink-connector-elasticsearch2_${scala.binary.version}\ +${flink.version}\ +' pom.xml + +sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/" pom.xml + +mvn clean package -nsu + +cd target +jar tvf flink-java-project-0.1.jar > contentsInJar.txt + +if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then + +echo "Success: There are no flink core classes are contained in the jar." +else +echo "Failure: There are flink core classes are contained in the jar." +exit 1 +fi + +if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/quickstart/ElasticsearchStreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" contentsInJar.txt` -eq '0' ]]; then + +echo "Failure: Since ElasticsearchStreamingJob.class and other user classes are not included in the jar. " +exit 1 +else +echo "Success: ElasticsearchStreamingJob.class and other user classes are included in the jar." +fi + +cd $CURRENT_DIR + +source "$(dirname "$0")"/common.sh + +start_cluster + +mkdir -p $TEST_DATA_DIR + +ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"; + +curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz +tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/ +ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5 + +# start elasticsearch cluster +nohup $ELASTICSEARCH_DIR/bin/elasticsearch & + +TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar + +# run the Flink job +$FLINK_DIR/bin/flink run -c org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_PROGRAM_JAR + +touch $T
[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476732#comment-16476732 ] ASF GitHub Bot commented on FLINK-9008: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188486423 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR + +mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink\ +-DarchetypeArtifactId=flink-quickstart-java\ +-DarchetypeVersion=1.5-SNAPSHOT\ +-DgroupId=org.apache.flink.quickstart \ +-DartifactId=flink-java-project\ +-Dversion=0.1 \ +-Dpackage=org.apache.flink.quickstart \ +-DinteractiveMode=false + +cd flink-java-project + +cp $CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/ + +position=$(awk '// {print NR}' pom.xml | head -1) + +sed -i -e ''"$(($position + 1))"'i\ +\ +org.apache.flink\ +flink-connector-elasticsearch2_${scala.binary.version}\ +${flink.version}\ +' pom.xml + +sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/" pom.xml + +mvn clean package -nsu + +cd target +jar tvf flink-java-project-0.1.jar > contentsInJar.txt + +if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then + +echo "Success: There are no flink core classes are contained in the jar." +else +echo "Failure: There are flink core classes are contained in the jar." +exit 1 +fi + +if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/quickstart/ElasticsearchStreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" contentsInJar.txt` -eq '0' ]]; then + +echo "Failure: Since ElasticsearchStreamingJob.class and other user classes are not included in the jar. " +exit 1 +else +echo "Success: ElasticsearchStreamingJob.class and other user classes are included in the jar." +fi + +cd $CURRENT_DIR + +source "$(dirname "$0")"/common.sh + +start_cluster + +mkdir -p $TEST_DATA_DIR + +ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"; + +curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz +tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/ +ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5 + +# start elasticsearch cluster +nohup $ELASTICSEARCH_DIR/bin/elasticsearch & + +TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar + +# run the Flink job --- End diff -- Will do. > End-to-end test: Quickstarts > > > Key
[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188486423 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR + +mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink\ +-DarchetypeArtifactId=flink-quickstart-java\ +-DarchetypeVersion=1.5-SNAPSHOT\ +-DgroupId=org.apache.flink.quickstart \ +-DartifactId=flink-java-project\ +-Dversion=0.1 \ +-Dpackage=org.apache.flink.quickstart \ +-DinteractiveMode=false + +cd flink-java-project + +cp $CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/ + +position=$(awk '// {print NR}' pom.xml | head -1) + +sed -i -e ''"$(($position + 1))"'i\ +\ +org.apache.flink\ +flink-connector-elasticsearch2_${scala.binary.version}\ +${flink.version}\ +' pom.xml + +sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/" pom.xml + +mvn clean package -nsu + +cd target +jar tvf flink-java-project-0.1.jar > contentsInJar.txt + +if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then + +echo "Success: There are no flink core classes are contained in the jar." +else +echo "Failure: There are flink core classes are contained in the jar." +exit 1 +fi + +if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/quickstart/ElasticsearchStreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" contentsInJar.txt` -eq '0' ]]; then + +echo "Failure: Since ElasticsearchStreamingJob.class and other user classes are not included in the jar. " +exit 1 +else +echo "Success: ElasticsearchStreamingJob.class and other user classes are included in the jar." +fi + +cd $CURRENT_DIR + +source "$(dirname "$0")"/common.sh + +start_cluster + +mkdir -p $TEST_DATA_DIR + +ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"; + +curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz +tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/ +ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5 + +# start elasticsearch cluster +nohup $ELASTICSEARCH_DIR/bin/elasticsearch & + +TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar + +# run the Flink job --- End diff -- Will do. ---
[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188486404 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR + +mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink\ +-DarchetypeArtifactId=flink-quickstart-java\ +-DarchetypeVersion=1.5-SNAPSHOT\ +-DgroupId=org.apache.flink.quickstart \ +-DartifactId=flink-java-project\ +-Dversion=0.1 \ +-Dpackage=org.apache.flink.quickstart \ +-DinteractiveMode=false + +cd flink-java-project + +cp $CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/ + +position=$(awk '// {print NR}' pom.xml | head -1) + +sed -i -e ''"$(($position + 1))"'i\ +\ +org.apache.flink\ +flink-connector-elasticsearch2_${scala.binary.version}\ +${flink.version}\ +' pom.xml + +sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/" pom.xml + +mvn clean package -nsu + +cd target +jar tvf flink-java-project-0.1.jar > contentsInJar.txt + +if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then + +echo "Success: There are no flink core classes are contained in the jar." +else +echo "Failure: There are flink core classes are contained in the jar." +exit 1 +fi + +if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/quickstart/ElasticsearchStreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" contentsInJar.txt` -eq '0' ]]; then + +echo "Failure: Since ElasticsearchStreamingJob.class and other user classes are not included in the jar. " +exit 1 +else +echo "Success: ElasticsearchStreamingJob.class and other user classes are included in the jar." +fi + +cd $CURRENT_DIR + +source "$(dirname "$0")"/common.sh + +start_cluster + +mkdir -p $TEST_DATA_DIR + +ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"; + +curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz +tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/ +ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5 + +# start elasticsearch cluster +nohup $ELASTICSEARCH_DIR/bin/elasticsearch & + +TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar + +# run the Flink job +$FLINK_DIR/bin/flink run -c org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_PROGRAM_JAR + +touch $TEST_DATA_DIR/output + +curl 'localhost:9200/my-index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output + +if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then +echo "Quickstarts end to end test pass." +else +
[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188486388 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR + +mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink\ +-DarchetypeArtifactId=flink-quickstart-java\ +-DarchetypeVersion=1.5-SNAPSHOT\ --- End diff -- Yes. Will change. ---
[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476724#comment-16476724 ] ASF GitHub Bot commented on FLINK-9008: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188486209 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR + +mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink\ +-DarchetypeArtifactId=flink-quickstart-java\ +-DarchetypeVersion=1.5-SNAPSHOT\ +-DgroupId=org.apache.flink.quickstart \ +-DartifactId=flink-java-project\ +-Dversion=0.1 \ +-Dpackage=org.apache.flink.quickstart \ +-DinteractiveMode=false + +cd flink-java-project + +cp $CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/ + +position=$(awk '// {print NR}' pom.xml | head -1) + +sed -i -e ''"$(($position + 1))"'i\ +\ +org.apache.flink\ +flink-connector-elasticsearch2_${scala.binary.version}\ +${flink.version}\ +' pom.xml + +sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/" pom.xml + +mvn clean package -nsu + +cd target +jar tvf flink-java-project-0.1.jar > contentsInJar.txt + +if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then + +echo "Success: There are no flink core classes are contained in the jar." +else +echo "Failure: There are flink core classes are contained in the jar." +exit 1 +fi + +if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/quickstart/ElasticsearchStreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" contentsInJar.txt` -eq '0' ]]; then + +echo "Failure: Since ElasticsearchStreamingJob.class and other user classes are not included in the jar. " +exit 1 +else +echo "Success: ElasticsearchStreamingJob.class and other user classes are included in the jar." +fi + +cd $CURRENT_DIR + +source "$(dirname "$0")"/common.sh + +start_cluster + +mkdir -p $TEST_DATA_DIR + +ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"; + +curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz +tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/ +ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5 + +# start elasticsearch cluster +nohup $ELASTICSEARCH_DIR/bin/elasticsearch & + +TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar + +# run the Flink job +$FLINK_DIR/bin/flink run -c org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_PROGRAM_JAR + +touch $T
[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts
[ https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476725#comment-16476725 ] ASF GitHub Bot commented on FLINK-9008: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188486224 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR + +mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink\ +-DarchetypeArtifactId=flink-quickstart-java\ +-DarchetypeVersion=1.5-SNAPSHOT\ +-DgroupId=org.apache.flink.quickstart \ +-DartifactId=flink-java-project\ +-Dversion=0.1 \ +-Dpackage=org.apache.flink.quickstart \ +-DinteractiveMode=false + +cd flink-java-project + +cp $CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/ + +position=$(awk '// {print NR}' pom.xml | head -1) + +sed -i -e ''"$(($position + 1))"'i\ +\ +org.apache.flink\ +flink-connector-elasticsearch2_${scala.binary.version}\ +${flink.version}\ +' pom.xml + +sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/" pom.xml + +mvn clean package -nsu + +cd target +jar tvf flink-java-project-0.1.jar > contentsInJar.txt + +if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then + +echo "Success: There are no flink core classes are contained in the jar." +else +echo "Failure: There are flink core classes are contained in the jar." +exit 1 --- End diff -- Will do. > End-to-end test: Quickstarts > > > Key: FLINK-9008 > URL: https://issues.apache.org/jira/browse/FLINK-9008 > Project: Flink > Issue Type: Sub-task > Components: Quickstarts, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: mingleizhang >Priority: Blocker > Fix For: 1.6.0 > > > We could add an end-to-end test which verifies Flink's quickstarts. It should > do the following: > # create a new Flink project using the quickstarts archetype > # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or > library) > # run {{mvn clean package -Pbuild-jar}} > # verify that no core dependencies are contained in the jar file > # Run the program -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188486224 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR + +mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink\ +-DarchetypeArtifactId=flink-quickstart-java\ +-DarchetypeVersion=1.5-SNAPSHOT\ +-DgroupId=org.apache.flink.quickstart \ +-DartifactId=flink-java-project\ +-Dversion=0.1 \ +-Dpackage=org.apache.flink.quickstart \ +-DinteractiveMode=false + +cd flink-java-project + +cp $CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/ + +position=$(awk '// {print NR}' pom.xml | head -1) + +sed -i -e ''"$(($position + 1))"'i\ +\ +org.apache.flink\ +flink-connector-elasticsearch2_${scala.binary.version}\ +${flink.version}\ +' pom.xml + +sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/" pom.xml + +mvn clean package -nsu + +cd target +jar tvf flink-java-project-0.1.jar > contentsInJar.txt + +if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then + +echo "Success: There are no flink core classes are contained in the jar." +else +echo "Failure: There are flink core classes are contained in the jar." +exit 1 --- End diff -- Will do. ---
[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/5823#discussion_r188486209 --- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh --- @@ -0,0 +1,123 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# End to end test for quick starts test. + +CURRENT_DIR=$(cd "$( dirname "$0" )" && pwd ) + +cd $CURRENT_DIR + +mvn archetype:generate \ +-DarchetypeGroupId=org.apache.flink\ +-DarchetypeArtifactId=flink-quickstart-java\ +-DarchetypeVersion=1.5-SNAPSHOT\ +-DgroupId=org.apache.flink.quickstart \ +-DartifactId=flink-java-project\ +-Dversion=0.1 \ +-Dpackage=org.apache.flink.quickstart \ +-DinteractiveMode=false + +cd flink-java-project + +cp $CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/ + +position=$(awk '// {print NR}' pom.xml | head -1) + +sed -i -e ''"$(($position + 1))"'i\ +\ +org.apache.flink\ +flink-connector-elasticsearch2_${scala.binary.version}\ +${flink.version}\ +' pom.xml + +sed -i -e "s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/" pom.xml + +mvn clean package -nsu + +cd target +jar tvf flink-java-project-0.1.jar > contentsInJar.txt + +if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/experimental" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq '0' ]]; then + +echo "Success: There are no flink core classes are contained in the jar." +else +echo "Failure: There are flink core classes are contained in the jar." +exit 1 +fi + +if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/quickstart/ElasticsearchStreamingJob.class" contentsInJar.txt` -eq '0' && \ + `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" contentsInJar.txt` -eq '0' ]]; then + +echo "Failure: Since ElasticsearchStreamingJob.class and other user classes are not included in the jar. " +exit 1 +else +echo "Success: ElasticsearchStreamingJob.class and other user classes are included in the jar." +fi + +cd $CURRENT_DIR + +source "$(dirname "$0")"/common.sh + +start_cluster + +mkdir -p $TEST_DATA_DIR + +ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"; + +curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz +tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/ +ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5 + +# start elasticsearch cluster +nohup $ELASTICSEARCH_DIR/bin/elasticsearch & + +TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar + +# run the Flink job +$FLINK_DIR/bin/flink run -c org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_PROGRAM_JAR + +touch $TEST_DATA_DIR/output + +curl 'localhost:9200/my-index/_search?q=*&pretty&size=21' > $TEST_DATA_DIR/output + +if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then +echo "Quickstarts end to end test pass." +else +
[jira] [Commented] (FLINK-9219) Add support for OpenGIS features in Table & SQL API
[ https://issues.apache.org/jira/browse/FLINK-9219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476720#comment-16476720 ] Xingcan Cui commented on FLINK-9219: Thanks for the feedback [~twalthr]! The majority of your comments make sense to me. However, about using the Calcite type or {{com.esri.core.geometry.Geometry}}, I still can't get your point. I wonder if you could explain a little bit more about that. Thanks, Xingcan > Add support for OpenGIS features in Table & SQL API > --- > > Key: FLINK-9219 > URL: https://issues.apache.org/jira/browse/FLINK-9219 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Xingcan Cui >Priority: Major > > CALCITE-1968 added core functionality for handling > spatial/geographical/geometry data. It should not be too hard to expose these > features also in Flink's Table & SQL API. We would need a new {{GEOMETRY}} > data type and connect the function APIs. > Right now the following functions are supported by Calcite: > {code} > ST_AsText, ST_AsWKT, ST_Boundary, ST_Buffer, ST_Contains, > ST_ContainsProperly, ST_Crosses, ST_Disjoint, ST_Distance, ST_DWithin, > ST_Envelope, ST_EnvelopesIntersect, ST_Equals, ST_GeometryType, > ST_GeometryTypeCode, ST_GeomFromText, ST_Intersects, ST_Is3D, > ST_LineFromText, ST_MakeLine, ST_MakePoint, ST_MLineFromText, > ST_MPointFromText, ST_MPolyFromText, ST_Overlaps, ST_Point, ST_PointFromText, > ST_PolyFromText, ST_SetSRID, ST_Touches, ST_Transform, ST_Union, ST_Within, > ST_Z > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476706#comment-16476706 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188481796 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -181,6 +181,25 @@ public static Properties replaceDeprecatedProducerKeys(Properties configProps) { return configProps; } + public static Properties replaceDeprecatedConsumerKeys(Properties configProps) { + if (configProps.containsKey(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE)) { --- End diff -- Minor: this could be generalized by iterating over a map of oldkey -> newkey and I would also suggest to log a warning for deprecated keys > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188481796 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java --- @@ -181,6 +181,25 @@ public static Properties replaceDeprecatedProducerKeys(Properties configProps) { return configProps; } + public static Properties replaceDeprecatedConsumerKeys(Properties configProps) { + if (configProps.containsKey(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE)) { --- End diff -- Minor: this could be generalized by iterating over a map of oldkey -> newkey and I would also suggest to log a warning for deprecated keys ---
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476704#comment-16476704 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188481562 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */ public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; - /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- I have deprecated some old properties and added the new ones. This is my first time using the @deprecated annotations. Do let me know if there are better ways of doing this. I used sample PRs like https://goo.gl/LWcrp2 for these changes. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188481562 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */ public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; - /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- I have deprecated some old properties and added the new ones. This is my first time using the @deprecated annotations. Do let me know if there are better ways of doing this. I used sample PRs like https://goo.gl/LWcrp2 for these changes. ---
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476701#comment-16476701 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188481158 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -97,8 +228,7 @@ public void testClientConfigOverride() { AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient"); ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, - "clientConfiguration"); + "clientConfiguration"); assertEquals(, clientConfiguration.getSocketTimeout()); } - --- End diff -- Done. Sorry for the inconvenience. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476700#comment-16476700 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188481152 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -97,8 +228,7 @@ public void testClientConfigOverride() { AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient"); ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, - "clientConfiguration"); + "clientConfiguration"); --- End diff -- Done. Sorry for the inconvenience. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188481158 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -97,8 +228,7 @@ public void testClientConfigOverride() { AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient"); ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, - "clientConfiguration"); + "clientConfiguration"); assertEquals(, clientConfiguration.getSocketTimeout()); } - --- End diff -- Done. Sorry for the inconvenience. ---
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188481152 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -97,8 +228,7 @@ public void testClientConfigOverride() { AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient"); ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, - "clientConfiguration"); + "clientConfiguration"); --- End diff -- Done. Sorry for the inconvenience. ---
[jira] [Commented] (FLINK-9372) Typo on Elasticsearch website link (elastic.io --> elastic.co)
[ https://issues.apache.org/jira/browse/FLINK-9372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476535#comment-16476535 ] ASF GitHub Bot commented on FLINK-9372: --- GitHub user medcv opened a pull request: https://github.com/apache/flink/pull/6018 [FLINK-9372] Typo on Elasticsearch website link (elastic.io --> elastic.co) ## What is the purpose of the change Typo on Elasticsearch website link elastic.io --> elastic.co ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/medcv/flink FLINK-9372 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6018.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6018 commit 4e42304dce4bfbcab964548e4f153dc8c212b569 Author: Yadan.JS Date: 2018-05-15T22:03:22Z [FLINK-9372] Typo on Elasticsearch website link > Typo on Elasticsearch website link (elastic.io --> elastic.co) > -- > > Key: FLINK-9372 > URL: https://issues.apache.org/jira/browse/FLINK-9372 > Project: Flink > Issue Type: Improvement > Components: Documentation, ElasticSearch Connector >Affects Versions: 1.5.0, 1.4.1, 1.4.2, 1.5.1 >Reporter: Yazdan Shirvany >Assignee: Yazdan Shirvany >Priority: Minor > > Typo on website link in Elasticsearch Java Docs (elastic.io --> elastic.co) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #6018: [FLINK-9372] Typo on Elasticsearch website link (e...
GitHub user medcv opened a pull request: https://github.com/apache/flink/pull/6018 [FLINK-9372] Typo on Elasticsearch website link (elastic.io --> elastic.co) ## What is the purpose of the change Typo on Elasticsearch website link elastic.io --> elastic.co ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/medcv/flink FLINK-9372 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6018.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6018 commit 4e42304dce4bfbcab964548e4f153dc8c212b569 Author: Yadan.JS Date: 2018-05-15T22:03:22Z [FLINK-9372] Typo on Elasticsearch website link ---
[jira] [Created] (FLINK-9372) Typo on Elasticsearch website link (elastic.io --> elastic.co)
Yazdan Shirvany created FLINK-9372: -- Summary: Typo on Elasticsearch website link (elastic.io --> elastic.co) Key: FLINK-9372 URL: https://issues.apache.org/jira/browse/FLINK-9372 Project: Flink Issue Type: Improvement Components: Documentation, ElasticSearch Connector Affects Versions: 1.4.2, 1.4.1, 1.5.0, 1.5.1 Reporter: Yazdan Shirvany Assignee: Yazdan Shirvany Typo on website link in Elasticsearch Java Docs (elastic.io --> elastic.co) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9371) High Availability JobManager Registration Failure
Jason Kania created FLINK-9371: -- Summary: High Availability JobManager Registration Failure Key: FLINK-9371 URL: https://issues.apache.org/jira/browse/FLINK-9371 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.4.2 Reporter: Jason Kania The following error is happening intermittently on an 3 node cluster with 2 Job Managers configured in HA mode. When this happens, the two JobManager instances are associated with one another. 2018-05-15 19:00:06,400 INFO org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Trying to associate with JobManager leader akka.tcp://flink@aaa-1:5/user/jobmanager 2018-05-15 19:00:06,404 WARN org.apache.flink.runtime.jobmanager.JobManager - Discard message LeaderSessionMessage(0bbe70c4-2642-4a08-912f-6cc09646281f,RegisterResourceManager akka://flink/user/resourcemanager-d6567c5d-85f4-4b18-8eac-cf9725d076a5) because there is currently no valid leader id known. 2018-05-15 19:00:16,418 ERROR org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Resource manager could not register at JobManager akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka://flink/), Path(/user/jobmanager)]] after [1 ms]. Sender[null] sent message of type "org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage". at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) at java.lang.Thread.run(Thread.java:748) Sometimes the following type of log also comes out following the previous log: 2018-05-15 19:13:47,525 WARN org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager - Discard message LeaderSessionMessage(5cab29b9-10d3-4b25-b934-f06b82be15b5,TriggerRegistrationAtJobManager akka.tcp://flink@aaa-1:5/user/jobmanager) because the expected leader session ID 61075587-51da-4e58-ac4f-9ea118ccdde9 did not equal the received leader session ID 5cab29b9-10d3-4b25-b934-f06b82be15b5. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5995 Also as for the package name or place where to put it, I don't feel competent to suggest a place, therefore will be happy to apply your suggestion. ---
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476320#comment-16476320 ] ASF GitHub Bot commented on FLINK-9337: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5995 Also as for the package name or place where to put it, I don't feel competent to suggest a place, therefore will be happy to apply your suggestion. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5995 As for the snapshot binary data, I do understand that it should be created with appropriate flink version (in this case in theory with flink 1.3) and I've tried really hard to do so until I found out that this test is incompatible with 1.3 and the data could not be generated with flink 1.3 Later found out the comment to the test class that also states so: > Important: Since Avro itself broke class compatibility between 1.7.7 (used in Flink 1.3) > * and 1.8.2 (used in Flink 1.4), the Avro by Pojo compatibility is broken through Avro already. > * This test only tests that the Avro serializer change (switching from Pojo to Avro for Avro types) > * works properly. Also the commented code does not compile with flink 1.3(but this is a minor thing) Data serialized with version of avro used in flink 1.3 (1.7.7) is not binary compatible with avro 1.8.2 (in flink 1.4+), due to changes how SpecificFixed is constructed. Therefore how I regenerated this snapshot data is that I run the commented code on current branch. That is why I also changed few descriptions to that test as it test compatibility of `PojoSerializer` with `AvroSerializer` rather than binary backwards compatibility. Nevertheless I am more than happy to hear any comments on that. ---
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476314#comment-16476314 ] ASF GitHub Bot commented on FLINK-9337: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/5995 As for the snapshot binary data, I do understand that it should be created with appropriate flink version (in this case in theory with flink 1.3) and I've tried really hard to do so until I found out that this test is incompatible with 1.3 and the data could not be generated with flink 1.3 Later found out the comment to the test class that also states so: > Important: Since Avro itself broke class compatibility between 1.7.7 (used in Flink 1.3) > * and 1.8.2 (used in Flink 1.4), the Avro by Pojo compatibility is broken through Avro already. > * This test only tests that the Avro serializer change (switching from Pojo to Avro for Avro types) > * works properly. Also the commented code does not compile with flink 1.3(but this is a minor thing) Data serialized with version of avro used in flink 1.3 (1.7.7) is not binary compatible with avro 1.8.2 (in flink 1.4+), due to changes how SpecificFixed is constructed. Therefore how I regenerated this snapshot data is that I run the commented code on current branch. That is why I also changed few descriptions to that test as it test compatibility of `PojoSerializer` with `AvroSerializer` rather than binary backwards compatibility. Nevertheless I am more than happy to hear any comments on that. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476299#comment-16476299 ] ASF GitHub Bot commented on FLINK-9337: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188386920 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -275,9 +304,19 @@ else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { // Utilities // + private static boolean isGenericRecord(Class type) { + return !SpecificRecord.class.isAssignableFrom(type) && + GenericRecord.class.isAssignableFrom(type); + } + @Override public TypeSerializer duplicate() { - return new AvroSerializer<>(type); + if (schemaString != null) { + return new AvroSerializer<>(type, new Schema.Parser().parse(schemaString)); --- End diff -- Didn't think it through well. Thought we need to create a deep copy of the schema, but as it is stateless I think we can just pass the schema. My mistake. Correct me if I am wrong. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188386920 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -275,9 +304,19 @@ else if (configSnapshot instanceof AvroSerializerConfigSnapshot) { // Utilities // + private static boolean isGenericRecord(Class type) { + return !SpecificRecord.class.isAssignableFrom(type) && + GenericRecord.class.isAssignableFrom(type); + } + @Override public TypeSerializer duplicate() { - return new AvroSerializer<>(type); + if (schemaString != null) { + return new AvroSerializer<>(type, new Schema.Parser().parse(schemaString)); --- End diff -- Didn't think it through well. Thought we need to create a deep copy of the schema, but as it is stateless I think we can just pass the schema. My mistake. Correct me if I am wrong. ---
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188386286 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; + + /** +* Reader that deserializes byte array into a record. +*/ + private transient GenericDatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private transient MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data. +*/ + private transient Decoder decoder; + + /** +* Avro schema for the reader. +*/ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + this.reader = reader; + if (reader != null) { +
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476291#comment-16476291 ] ASF GitHub Bot commented on FLINK-9337: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188386286 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; + + /** +* Reader that deserializes byte array into a record. +*/ + private transient GenericDatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private transient MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data. +*/ + private transient Decoder decoder; + + /** +* Avro schema for the reader. +*/ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordCla
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476283#comment-16476283 ] ASF GitHub Bot commented on FLINK-9337: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188385527 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; + + /** +* Reader that deserializes byte array into a record. +*/ + private transient GenericDatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private transient MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data. +*/ + private transient Decoder decoder; + + /** +* Avro schema for the reader. +*/ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordCla
[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188385527 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; + + /** +* Reader that deserializes byte array into a record. +*/ + private transient GenericDatumReader datumReader; + + /** +* Input stream to read message from. +*/ + private transient MutableByteArrayInputStream inputStream; + + /** +* Avro decoder that decodes binary data. +*/ + private transient Decoder decoder; + + /** +* Avro schema for the reader. +*/ + private transient Schema reader; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be one of: +*{@link org.apache.avro.specific.SpecificRecord}, +*{@link org.apache.avro.generic.GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +*/ + AvroDeserializationSchema(Class recordClazz, @Nullable Schema reader) { + Preconditions.checkNotNull(recordClazz, "Avro record class must not be null."); + this.recordClazz = recordClazz; + this.inputStream = new MutableByteArrayInputStream(); + this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); + this.reader = reader; + if (reader != null) { +
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476260#comment-16476260 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188379050 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -97,8 +228,7 @@ public void testClientConfigOverride() { AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient"); ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, - "clientConfiguration"); + "clientConfiguration"); assertEquals(, clientConfiguration.getSocketTimeout()); } - --- End diff -- remove unnecessary change > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476257#comment-16476257 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188378986 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -97,8 +228,7 @@ public void testClientConfigOverride() { AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient"); ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, - "clientConfiguration"); + "clientConfiguration"); --- End diff -- remove unnecessary change > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188378986 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -97,8 +228,7 @@ public void testClientConfigOverride() { AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient"); ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, - "clientConfiguration"); + "clientConfiguration"); --- End diff -- remove unnecessary change ---
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188379050 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -97,8 +228,7 @@ public void testClientConfigOverride() { AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient"); ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, - "clientConfiguration"); + "clientConfiguration"); assertEquals(, clientConfiguration.getSocketTimeout()); } - --- End diff -- remove unnecessary change ---
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476252#comment-16476252 ] ASF GitHub Bot commented on FLINK-8944: --- Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188378324 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */ public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; - /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- I think that this should be one unit of work. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188378324 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */ public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; - /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- I think that this should be one unit of work. ---
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476243#comment-16476243 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377363 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */ public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; - /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- For the time being, I will keep the older names and make the changes to deprecate the property names in the followup PR. I am not sure what the policy is with respect to changes size. I feel breaking this up to 2 different PRs will make it easier to review. Let me know if you feel otherwise. I will pull in those changes into this PR then. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476244#comment-16476244 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377376 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -382,50 +386,62 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { - final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); - describeStreamRequest.setStreamName(streamName); - describeStreamRequest.setExclusiveStartShardId(startShardId); + private ListShardsResult listShards(String streamName, @Nullable String startShardId, + @Nullable String startNextToken) + throws InterruptedException { + final ListShardsRequest listShardsRequest = new ListShardsRequest(); + if (startNextToken == null) { + listShardsRequest.setExclusiveStartShardId(startShardId); + listShardsRequest.setStreamName(streamName); + } else { + // Note the nextToken returned by AWS expires within 300 sec. + listShardsRequest.setNextToken(startNextToken); + } - DescribeStreamResult describeStreamResult = null; + ListShardsResult listShardsResults = null; - // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). + // Call ListShards, with full-jitter backoff (if we get LimitExceededException). int attemptCount = 0; - while (describeStreamResult == null) { // retry until we get a result + // List Shards returns just the first 1000 shard entries. Make sure that all entries + // are taken up. + while (listShardsResults == null) { // retry until we get a result try { - describeStreamResult = kinesisClient.describeStream(describeStreamRequest); + listShardsResults = kinesisClient.listShards(listShardsRequest); } catch (LimitExceededException le) { long backoffMillis = fullJitterBackoff( - describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); - LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " - + backoffMillis + " millis."); + listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++); + LOG.warn("Got LimitExceededException when listing shards from stream " + streamName + + ". Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); - } catch (ResourceNotFoundException re) { - throw new RuntimeException("Error while getting stream details", re); - } - } - - String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); - if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString( { - if (LOG.isWarnEnabled()) { - LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + - "describeStream operation will not contain any shard information."); + } catch (ResourceInUseException reInUse) { + if (LOG.isWarnEnabled()) { + // List Shards will throw an exception if stream in not in active state. Will return + LOG.warn("The stream is currently not in acti
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476246#comment-16476246 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377415 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -353,19 +355,21 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { private List getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { List shardsOfStream = new ArrayList<>(); - DescribeStreamResult describeStreamResult; + // List Shards returns just the first 1000 shard entries. In order to read the entire stream, + // we need to use the returned nextToken to get additional shards. + ListShardsResult listShardsResult; + String startShardToken = null; do { - describeStreamResult = describeStream(streamName, lastSeenShardId); - - List shards = describeStreamResult.getStreamDescription().getShards(); + listShardsResult = listShards(streamName, lastSeenShardId, startShardToken); --- End diff -- Thanks for catching this. This is something which I have fixed by clearing shardsOfStream to ensure we return an empty shardsOfStream in case of ExpiredTokenException. I intended the following behavior for this: In case there is an unlikely case of expired next token, then we will just return an empty ShardsOfStream. This should be alright since in case there are no new shards discovered, by default it ends up returning an empty shardsOfStream. > Use ListShards for shard discovery in the flink kinesis connector > - > > Key: FLINK-8944 > URL: https://issues.apache.org/jira/browse/FLINK-8944 > Project: Flink > Issue Type: Improvement >Reporter: Kailash Hassan Dayanand >Priority: Minor > > Currently the DescribeStream AWS API used to get list of shards is has a > restricted rate limits on AWS. (5 requests per sec per account). This is > problematic when running multiple flink jobs all on same account since each > subtasks calls the Describe Stream. Changing this to ListShards will provide > more flexibility on rate limits as ListShards has a 100 requests per second > per data stream limits. > More details on the mailing list. https://goo.gl/mRXjKh -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476245#comment-16476245 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377393 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -382,50 +386,62 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { - final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); - describeStreamRequest.setStreamName(streamName); - describeStreamRequest.setExclusiveStartShardId(startShardId); + private ListShardsResult listShards(String streamName, @Nullable String startShardId, + @Nullable String startNextToken) + throws InterruptedException { + final ListShardsRequest listShardsRequest = new ListShardsRequest(); + if (startNextToken == null) { + listShardsRequest.setExclusiveStartShardId(startShardId); + listShardsRequest.setStreamName(streamName); + } else { + // Note the nextToken returned by AWS expires within 300 sec. + listShardsRequest.setNextToken(startNextToken); + } - DescribeStreamResult describeStreamResult = null; + ListShardsResult listShardsResults = null; - // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). + // Call ListShards, with full-jitter backoff (if we get LimitExceededException). int attemptCount = 0; - while (describeStreamResult == null) { // retry until we get a result + // List Shards returns just the first 1000 shard entries. Make sure that all entries + // are taken up. + while (listShardsResults == null) { // retry until we get a result try { - describeStreamResult = kinesisClient.describeStream(describeStreamRequest); + listShardsResults = kinesisClient.listShards(listShardsRequest); } catch (LimitExceededException le) { long backoffMillis = fullJitterBackoff( - describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); - LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " - + backoffMillis + " millis."); + listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++); + LOG.warn("Got LimitExceededException when listing shards from stream " + streamName + + ". Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); - } catch (ResourceNotFoundException re) { - throw new RuntimeException("Error while getting stream details", re); - } - } - - String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); - if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString( { - if (LOG.isWarnEnabled()) { - LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + - "describeStream operation will not contain any shard information."); + } catch (ResourceInUseException reInUse) { + if (LOG.isWarnEnabled()) { + // List Shards will throw an exception if stream in not in active state. Will return + LOG.warn("The stream is currently not in acti
[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476247#comment-16476247 ] ASF GitHub Bot commented on FLINK-8944: --- Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377434 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -70,20 +113,107 @@ public void testIsRecoverableExceptionWithNullErrorType() { } @Test - public void testCustomConfigurationOverride() { - Properties configProps = new Properties(); - configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - KinesisProxy proxy = new KinesisProxy(configProps) { - @Override - protected AmazonKinesis createKinesisClient(Properties configProps) { - ClientConfiguration clientConfig = new ClientConfigurationFactory().getConfig(); - clientConfig.setSocketTimeout(1); - return AWSUtil.createKinesisClient(configProps, clientConfig); + public void testGetShardList() throws Exception { + List shardIds = + Arrays.asList( + "shardId-", + "shardId-0001", + "shardId-0002", + "shardId-0003"); + shardIdSet = new HashSet<>(shardIds); + shards = + shardIds + .stream() + .map(shardId -> new Shard().withShardId(shardId)) + .collect(Collectors.toList()); + Properties kinesisConsumerConfig = new Properties(); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "fake_accesskey"); + kinesisConsumerConfig.setProperty( + ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "fake_secretkey"); + KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig); + AmazonKinesis mockClient = mock(AmazonKinesis.class); + Whitebox.setInternalState(kinesisProxy, "kinesisClient", mockClient); + + ListShardsResult responseWithMoreData = + new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN); + ListShardsResult responseFinal = + new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null); + doReturn(responseWithMoreData) + .when(mockClient) + .listShards(argThat(initialListShardsRequestMatcher())); + doReturn(responseFinal).when(mockClient).listShards(argThat(listShardsNextToken(NEXT_TOKEN))); + HashMap streamHashMap = + createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName)); + GetShardListResult shardListResult = kinesisProxy.getShardList(streamHashMap); + + Assert.assertEquals(shardListResult.hasRetrievedShards(), true); + + Set expectedStreams = new HashSet<>(); + expectedStreams.add(fakeStreamName); + Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), expectedStreams); + List actualShardList = + shardListResult.getRetrievedShardListOfStream(fakeStreamName); + List expectedStreamShard = new ArrayList<>(); + System.out.println(actualShardList.toString()); + assertThat(actualShardList, hasSize(4)); + for (int i = 0; i < 4; i++) { + StreamShardHandle shardHandle = + new StreamShardHandle( + fakeStreamName, + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))); + expectedStreamShard.add(shardHandle); + } + + Assert.assertThat( + actualShardList, + containsInAnyOrder( + expectedStreamShard.toArray(ne
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377393 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -382,50 +386,62 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { - final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); - describeStreamRequest.setStreamName(streamName); - describeStreamRequest.setExclusiveStartShardId(startShardId); + private ListShardsResult listShards(String streamName, @Nullable String startShardId, + @Nullable String startNextToken) + throws InterruptedException { + final ListShardsRequest listShardsRequest = new ListShardsRequest(); + if (startNextToken == null) { + listShardsRequest.setExclusiveStartShardId(startShardId); + listShardsRequest.setStreamName(streamName); + } else { + // Note the nextToken returned by AWS expires within 300 sec. + listShardsRequest.setNextToken(startNextToken); + } - DescribeStreamResult describeStreamResult = null; + ListShardsResult listShardsResults = null; - // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). + // Call ListShards, with full-jitter backoff (if we get LimitExceededException). int attemptCount = 0; - while (describeStreamResult == null) { // retry until we get a result + // List Shards returns just the first 1000 shard entries. Make sure that all entries + // are taken up. + while (listShardsResults == null) { // retry until we get a result try { - describeStreamResult = kinesisClient.describeStream(describeStreamRequest); + listShardsResults = kinesisClient.listShards(listShardsRequest); } catch (LimitExceededException le) { long backoffMillis = fullJitterBackoff( - describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); - LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " - + backoffMillis + " millis."); + listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++); + LOG.warn("Got LimitExceededException when listing shards from stream " + streamName + + ". Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); - } catch (ResourceNotFoundException re) { - throw new RuntimeException("Error while getting stream details", re); - } - } - - String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); - if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString( { - if (LOG.isWarnEnabled()) { - LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + - "describeStream operation will not contain any shard information."); + } catch (ResourceInUseException reInUse) { + if (LOG.isWarnEnabled()) { + // List Shards will throw an exception if stream in not in active state. Will return + LOG.warn("The stream is currently not in active state. Reusing the older state " + + "for the time being"); + break; + } + } catch (ResourceNotFoundException
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377434 --- Diff: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java --- @@ -70,20 +113,107 @@ public void testIsRecoverableExceptionWithNullErrorType() { } @Test - public void testCustomConfigurationOverride() { - Properties configProps = new Properties(); - configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1"); - KinesisProxy proxy = new KinesisProxy(configProps) { - @Override - protected AmazonKinesis createKinesisClient(Properties configProps) { - ClientConfiguration clientConfig = new ClientConfigurationFactory().getConfig(); - clientConfig.setSocketTimeout(1); - return AWSUtil.createKinesisClient(configProps, clientConfig); + public void testGetShardList() throws Exception { + List shardIds = + Arrays.asList( + "shardId-", + "shardId-0001", + "shardId-0002", + "shardId-0003"); + shardIdSet = new HashSet<>(shardIds); + shards = + shardIds + .stream() + .map(shardId -> new Shard().withShardId(shardId)) + .collect(Collectors.toList()); + Properties kinesisConsumerConfig = new Properties(); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "fake_accesskey"); + kinesisConsumerConfig.setProperty( + ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "fake_secretkey"); + KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig); + AmazonKinesis mockClient = mock(AmazonKinesis.class); + Whitebox.setInternalState(kinesisProxy, "kinesisClient", mockClient); + + ListShardsResult responseWithMoreData = + new ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN); + ListShardsResult responseFinal = + new ListShardsResult().withShards(shards.subList(2, shards.size())).withNextToken(null); + doReturn(responseWithMoreData) + .when(mockClient) + .listShards(argThat(initialListShardsRequestMatcher())); + doReturn(responseFinal).when(mockClient).listShards(argThat(listShardsNextToken(NEXT_TOKEN))); + HashMap streamHashMap = + createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName)); + GetShardListResult shardListResult = kinesisProxy.getShardList(streamHashMap); + + Assert.assertEquals(shardListResult.hasRetrievedShards(), true); + + Set expectedStreams = new HashSet<>(); + expectedStreams.add(fakeStreamName); + Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), expectedStreams); + List actualShardList = + shardListResult.getRetrievedShardListOfStream(fakeStreamName); + List expectedStreamShard = new ArrayList<>(); + System.out.println(actualShardList.toString()); + assertThat(actualShardList, hasSize(4)); + for (int i = 0; i < 4; i++) { + StreamShardHandle shardHandle = + new StreamShardHandle( + fakeStreamName, + new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))); + expectedStreamShard.add(shardHandle); + } + + Assert.assertThat( + actualShardList, + containsInAnyOrder( + expectedStreamShard.toArray(new StreamShardHandle[actualShardList.size()]))); + } + + private static class ListShardsRequestMatcher extends TypeSafeDiagnosingMatcher { + private final String shardId; + private final String nextToken; +
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377363 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java --- @@ -65,14 +65,14 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */ public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format"; - /** The base backoff time between each describeStream attempt. */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; --- End diff -- For the time being, I will keep the older names and make the changes to deprecate the property names in the followup PR. I am not sure what the policy is with respect to changes size. I feel breaking this up to 2 different PRs will make it easier to review. Let me know if you feel otherwise. I will pull in those changes into this PR then. ---
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377415 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -353,19 +355,21 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { private List getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { List shardsOfStream = new ArrayList<>(); - DescribeStreamResult describeStreamResult; + // List Shards returns just the first 1000 shard entries. In order to read the entire stream, + // we need to use the returned nextToken to get additional shards. + ListShardsResult listShardsResult; + String startShardToken = null; do { - describeStreamResult = describeStream(streamName, lastSeenShardId); - - List shards = describeStreamResult.getStreamDescription().getShards(); + listShardsResult = listShards(streamName, lastSeenShardId, startShardToken); --- End diff -- Thanks for catching this. This is something which I have fixed by clearing shardsOfStream to ensure we return an empty shardsOfStream in case of ExpiredTokenException. I intended the following behavior for this: In case there is an unlikely case of expired next token, then we will just return an empty ShardsOfStream. This should be alright since in case there are no new shards discovered, by default it ends up returning an empty shardsOfStream. ---
[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...
Github user kailashhd commented on a diff in the pull request: https://github.com/apache/flink/pull/5992#discussion_r188377376 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java --- @@ -382,50 +386,62 @@ protected static boolean isRecoverableException(AmazonServiceException ex) { * @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result) * @return the result of the describe stream operation */ - private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException { - final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); - describeStreamRequest.setStreamName(streamName); - describeStreamRequest.setExclusiveStartShardId(startShardId); + private ListShardsResult listShards(String streamName, @Nullable String startShardId, + @Nullable String startNextToken) + throws InterruptedException { + final ListShardsRequest listShardsRequest = new ListShardsRequest(); + if (startNextToken == null) { + listShardsRequest.setExclusiveStartShardId(startShardId); + listShardsRequest.setStreamName(streamName); + } else { + // Note the nextToken returned by AWS expires within 300 sec. + listShardsRequest.setNextToken(startNextToken); + } - DescribeStreamResult describeStreamResult = null; + ListShardsResult listShardsResults = null; - // Call DescribeStream, with full-jitter backoff (if we get LimitExceededException). + // Call ListShards, with full-jitter backoff (if we get LimitExceededException). int attemptCount = 0; - while (describeStreamResult == null) { // retry until we get a result + // List Shards returns just the first 1000 shard entries. Make sure that all entries + // are taken up. + while (listShardsResults == null) { // retry until we get a result try { - describeStreamResult = kinesisClient.describeStream(describeStreamRequest); + listShardsResults = kinesisClient.listShards(listShardsRequest); } catch (LimitExceededException le) { long backoffMillis = fullJitterBackoff( - describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++); - LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for " - + backoffMillis + " millis."); + listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++); + LOG.warn("Got LimitExceededException when listing shards from stream " + streamName + + ". Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); - } catch (ResourceNotFoundException re) { - throw new RuntimeException("Error while getting stream details", re); - } - } - - String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus(); - if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString( { - if (LOG.isWarnEnabled()) { - LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " + - "describeStream operation will not contain any shard information."); + } catch (ResourceInUseException reInUse) { + if (LOG.isWarnEnabled()) { + // List Shards will throw an exception if stream in not in active state. Will return + LOG.warn("The stream is currently not in active state. Reusing the older state " + + "for the time being"); + break; + } + } catch (ResourceNotFoundException
[jira] [Closed] (FLINK-9341) Oracle: "Type is not supported: Date"
[ https://issues.apache.org/jira/browse/FLINK-9341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ken Geis closed FLINK-9341. --- Resolution: Not A Bug This issue happened because because I used BasicTypeInfo.DATE_TYPE_INFO in my input format instead of SqlTimeTypeInfo.TIMESTAMP. I changed that and the error went away. I agree with [~Sergey Nuyanzin]'s suggestion to include the full class name in the exception. > Oracle: "Type is not supported: Date" > - > > Key: FLINK-9341 > URL: https://issues.apache.org/jira/browse/FLINK-9341 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.2 >Reporter: Ken Geis >Priority: Major > > When creating a Table from an Oracle JDBCInputFormat with a date column, I > get the error "Type is not supported: Date". This happens with as simple a > query as > {code:java} > SELECT SYSDATE FROM DUAL{code} > Stack trace: > {noformat} > Caused by: org.apache.flink.table.api.TableException: Type is not supported: > Date > at > org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:336) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:68) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildLogicalRowType$1.apply(FlinkTypeFactory.scala:198) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildLogicalRowType$1.apply(FlinkTypeFactory.scala:195) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > ~[scala-library-2.11.11.jar:na] > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > ~[scala-library-2.11.11.jar:na] > at > org.apache.flink.table.calcite.FlinkTypeFactory.buildLogicalRowType(FlinkTypeFactory.scala:195) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.plan.schema.InlineTable.getRowType(InlineTable.scala:105) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:499) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:485) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.api.java.BatchTableEnvironment.fromDataSet(BatchTableEnvironment.scala:61) > ~[flink-table_2.11-1.4.2.jar:1.4.2] > at > org.apache.flink.table.api.java.BatchTableEnvironment$fromDataSet$0.call(Unknown > Source) ~[na:na] > (at my code...) > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9362) Document that Flink doesn't guarantee transaction of modifying state and register timers in processElement()
[ https://issues.apache.org/jira/browse/FLINK-9362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-9362. --- Resolution: Fixed > Document that Flink doesn't guarantee transaction of modifying state and > register timers in processElement() > > > Key: FLINK-9362 > URL: https://issues.apache.org/jira/browse/FLINK-9362 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9362) Document that Flink doesn't guarantee transaction of modifying state and register timers in processElement()
[ https://issues.apache.org/jira/browse/FLINK-9362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476191#comment-16476191 ] Bowen Li commented on FLINK-9362: - Make sense. It's not a traditional 'transactional guarantee', while it can provide some consistency in checkpoint. Closing > Document that Flink doesn't guarantee transaction of modifying state and > register timers in processElement() > > > Key: FLINK-9362 > URL: https://issues.apache.org/jira/browse/FLINK-9362 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.6.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers
[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476180#comment-16476180 ] ASF GitHub Bot commented on FLINK-3599: --- Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2211 Hi, There were a number of bugs and painful parts in the code, and unfortunately I'm not really sure whether they are major blockers or easily fixable. It surely wouldn't be an 1-2 day work, but some weeks might be realistic. As an example from just off the top of my head, we choke in situations where the POJO has a generic argument T, and a field of type T, and we tell Flink with a `.returns` about the generic argument. There were some other random test failures in older tests, which would require some debugging. I think we also have the problem that we don't correctly clean up the old generated classes, so they cause a memory leak across Flink jobs. Another potential issue is that since this PR, there was a new mechanism added to the Flink serializers for dealing with compatibility of serializers (e.g., to ensure that savepoints work across Flink versions), and I'm not sure whether it would be straightforward to update this PR for this. (For example, `PojoSerializer.ensureCompatibility` did not exist back then, and it looks a bit scary at first glance.) One of the pain points was shipping generated code from the driver program to the TMs. The problem is that if you simply serialize an instance of a generated class in the driver (for shipping the job to the cluster), the TM cannot just simply deserialize it, since the generated class does not exist in that JVM. The good news here is that since this PR I accidentally stumbled upon a new solution to this (using `writeReplace` and `readResolve`), which would be nicer than the current solution of this PR. Note that there is also this PR for similar code generation for the sorters: https://github.com/apache/flink/pull/3511 This has a lot of similarities, but less pain points, because the sorters are better isolated from the other parts of Flink than the serializers, and also because there we don't have to ship generated code from the driver to the TMs (because the sorters are created on the TMs). So I think if we wanted to push this serializer codegen PR, then first we should start with the sorter codegen PR instead, because it is the easier one of the two. I think that one is actually quite close to being mergable. (Note, that I think we solved the "cleanup between Flink jobs" issue in that PR, and that solution could be adapted to this PR.) Btw. we are in the planning phase of an MSc thesis with (@mukrram-bajwa), whose work might subsume both of these PRs. The idea is to use Graal to do many of the specializations that are in these PRs. The nice thing would be that the modifications to Flink's code would be much less compared to these PRs, as the magic would happen mostly inside a Graal compilation plugin as custom optimization passes. Best, Gábor > GSoC: Code Generation in Serializers > > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Márton Balassi >Assignee: Gabor Horvath >Priority: Major > Labels: gsoc2016, mentor > > The current implementation of the serializers can be a > performance bottleneck in some scenarios. These performance problems were > also reported on the mailing list recently [1]. > E.g. the PojoSerializer uses reflection for accessing the fields, which is > slow [2]. > For the complete proposal see [3]. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369 > [3] > https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7814) Add BETWEEN expression to Table API
[ https://issues.apache.org/jira/browse/FLINK-7814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin updated FLINK-7814: --- Description: * The Table API does not have a BETWEEN expression. BETWEEN is quite handy when defining join predicates for window joins. (was: The Table API does not have a BETWEEN expression. BETWEEN is quite handy when defining join predicates for window joins.) > Add BETWEEN expression to Table API > > > Key: FLINK-7814 > URL: https://issues.apache.org/jira/browse/FLINK-7814 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Priority: Minor > > * The Table API does not have a BETWEEN expression. BETWEEN is quite handy > when defining join predicates for window joins. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #2211: [WIP][FLINK-3599] Code generation for PojoSerializer and ...
Github user ggevay commented on the issue: https://github.com/apache/flink/pull/2211 Hi, There were a number of bugs and painful parts in the code, and unfortunately I'm not really sure whether they are major blockers or easily fixable. It surely wouldn't be an 1-2 day work, but some weeks might be realistic. As an example from just off the top of my head, we choke in situations where the POJO has a generic argument T, and a field of type T, and we tell Flink with a `.returns` about the generic argument. There were some other random test failures in older tests, which would require some debugging. I think we also have the problem that we don't correctly clean up the old generated classes, so they cause a memory leak across Flink jobs. Another potential issue is that since this PR, there was a new mechanism added to the Flink serializers for dealing with compatibility of serializers (e.g., to ensure that savepoints work across Flink versions), and I'm not sure whether it would be straightforward to update this PR for this. (For example, `PojoSerializer.ensureCompatibility` did not exist back then, and it looks a bit scary at first glance.) One of the pain points was shipping generated code from the driver program to the TMs. The problem is that if you simply serialize an instance of a generated class in the driver (for shipping the job to the cluster), the TM cannot just simply deserialize it, since the generated class does not exist in that JVM. The good news here is that since this PR I accidentally stumbled upon a new solution to this (using `writeReplace` and `readResolve`), which would be nicer than the current solution of this PR. Note that there is also this PR for similar code generation for the sorters: https://github.com/apache/flink/pull/3511 This has a lot of similarities, but less pain points, because the sorters are better isolated from the other parts of Flink than the serializers, and also because there we don't have to ship generated code from the driver to the TMs (because the sorters are created on the TMs). So I think if we wanted to push this serializer codegen PR, then first we should start with the sorter codegen PR instead, because it is the easier one of the two. I think that one is actually quite close to being mergable. (Note, that I think we solved the "cleanup between Flink jobs" issue in that PR, and that solution could be adapted to this PR.) Btw. we are in the planning phase of an MSc thesis with (@mukrram-bajwa), whose work might subsume both of these PRs. The idea is to use Graal to do many of the specializations that are in these PRs. The nice thing would be that the modifications to Flink's code would be much less compared to these PRs, as the magic would happen mostly inside a Graal compilation plugin as custom optimization passes. Best, Gábor ---
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476118#comment-16476118 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188321914 --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.formats.avro.AvroDeserializationSchema; +import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses + * Confluent Schema Registry. + * + * @param type of record it produces + */ +public class ConfluentRegistryAvroDeserializationSchema + extends RegistryAvroDeserializationSchema { + + private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000; + + /** +* Creates a Avro deserialization schema. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry +*/ + private ConfluentRegistryAvroDeserializationSchema( + Class recordClazz, + @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader, schemaCoderProvider); + } + + /** +* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord} +* using provided reader schema and looks up writer schema in Confluent Schema Registry. +* +* @param schema schema of produced records +* @param urlurl of schema registry to connect +* @return deserialized record in form of {@link GenericRecord} +*/ + public static ConfluentRegistryAvroDeserializationSchema forGeneric( + Schema schema, String url) { + return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY); + } + + /** +* Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord} +* using provided reader schema and looks up writer schema in Confluent Schema Registry. +* +* @param schema schema of produced records +* @param url url of schema registry to connect +* @param identityMapCapacity maximum number of cached schema versions (default: 1000) +* @return deserialized record in form of {@link GenericRecord} +*/ + public static ConfluentRegistryAvroDeserializationSchema forGeneric( + Schema schema, String url, int identityMapCapacity) { + return new ConfluentRegistryAvroDeserializationSchema<>( + GenericRecord.class, + schema, + new CachedSchemaCoderProvider(url, identityMapCapacity)); + } + + /** +* Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro +* schema and looks u
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476133#comment-16476133 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188350196 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}. + * + * @param type of record it produces + */ +public class RegistryAvroDeserializationSchema extends AvroDeserializationSchema { + private final SchemaCoder.SchemaCoderProvider schemaCoderProvider; + private transient SchemaCoder schemaCoder; + + /** +* Creates Avro deserialization schema that reads schema from input stream using provided {@link SchemaCoder}. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider schema provider that allows instantiation of {@link SchemaCoder} that will be used for +*schema reading +*/ + protected RegistryAvroDeserializationSchema( + Class recordClazz, + @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader); + this.schemaCoderProvider = schemaCoderProvider; + this.schemaCoder = schemaCoderProvider.get(); + } + + @Override + public T deserialize(byte[] message) { + // read record + try { + getInputStream().setBuffer(message); + Schema writerSchema = schemaCoder.readSchema(getInputStream()); + Schema readerSchema = getReaderSchema(); + + GenericDatumReader datumReader = getDatumReader(); + + datumReader.setSchema(writerSchema); + datumReader.setExpected(readerSchema); + + return datumReader.read(null, getDecoder()); + } catch (Exception e) { + throw new RuntimeException("Failed to deserialize Row.", e); + } + } + + private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { --- End diff -- See above, would suggest to avoid readObject() > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476124#comment-16476124 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188328437 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* --- End diff -- Double Apache header > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476137#comment-16476137 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188349853 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}. + * + * @param type of record it produces + */ +public class RegistryAvroDeserializationSchema extends AvroDeserializationSchema { + private final SchemaCoder.SchemaCoderProvider schemaCoderProvider; + private transient SchemaCoder schemaCoder; + + /** +* Creates Avro deserialization schema that reads schema from input stream using provided {@link SchemaCoder}. +* +* @param recordClazz class to which deserialize. Should be either +*{@link SpecificRecord} or {@link GenericRecord}. +* @param reader reader's Avro schema. Should be provided if recordClazz is +*{@link GenericRecord} +* @param schemaCoderProvider schema provider that allows instantiation of {@link SchemaCoder} that will be used for +*schema reading +*/ + protected RegistryAvroDeserializationSchema( + Class recordClazz, + @Nullable Schema reader, + SchemaCoder.SchemaCoderProvider schemaCoderProvider) { + super(recordClazz, reader); --- End diff -- Empty line / indentation > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476127#comment-16476127 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188347289 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* --- End diff -- This class uses a mixture of eager initialization of transient members (in readObject()) and lazy initialization (in getDatumReader()). I would suggest to do it all one way or the other. My suggestion would be to avoid `readObject()` whenever possible. If you encounter an exception during schema parsing (and it may be something weird from Jackson, like a missing manifest due to a shading issue), you will get the most unhelpful exception stack trace ever, in the weirdest place (like Flink's RPC message decoder). In my experience, when a user sees such a stack trace, they are rarely able to diagnose that. Best case they show up on the mailing list, worst case they give up. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476126#comment-16476126 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188348636 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* --- End diff -- That would mean initializing it all lazily, in `getDatumReader()` or in a `checkAvroInitialized()` method. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476135#comment-16476135 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188355390 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -99,9 +108,29 @@ /** * Creates a new AvroSerializer for the type indicated by the given class. +* This constructor is intended to be used with {@link SpecificRecord} or reflection serializer. +* For serializing {@link GenericData.Record} use {@link AvroSerializer#AvroSerializer(Class, Schema)} */ public AvroSerializer(Class type) { + Preconditions.checkArgument(!isGenericRecord(type), --- End diff -- Minor: Other preconditions checks in this class are done by statically imported methods. While this is not consistent within the code base, I would suggest to keep this consistent within a class as much as possible. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476129#comment-16476129 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188338897 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; --- End diff -- You can make this variable final as well. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476130#comment-16476130 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188328926 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { + + /** +* Class to deserialize to. +*/ + private Class recordClazz; + + private String schemaString = null; --- End diff -- I would avoid null initialization, it is redundant. It actually does nothing (fields are null anyways) but acually exists as byte code, hence costs cpu cycles. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476120#comment-16476120 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188325819 --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent; + +import org.apache.flink.formats.avro.SchemaCoder; + +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import org.apache.avro.Schema; + +import java.io.DataInputStream; +import java.io.InputStream; + +/** + * Reads schema using Confluent Schema Registry protocol. + */ +public class ConfluentSchemaRegistryCoder implements SchemaCoder { + + private final SchemaRegistryClient schemaRegistryClient; + + /** +* Creates {@link SchemaCoder} that uses provided {@link SchemaRegistryClient} to connect to +* schema registry. +* +* @param schemaRegistryClient client to connect schema registry +*/ + public ConfluentSchemaRegistryCoder(SchemaRegistryClient schemaRegistryClient) { + this.schemaRegistryClient = schemaRegistryClient; + } + + @Override + public Schema readSchema(InputStream in) throws Exception { + DataInputStream dataInputStream = new DataInputStream(in); + + if (dataInputStream.readByte() != 0) { + throw new RuntimeException("Unknown data format. Magic number does not match"); --- End diff -- RuntimeExceptions (unchecked exceptions) are usually used to indicate programming errors, or (as a workaround) if the scope does not allow throwing any exception. This here is a case for a checked exception, in my opinion, like an `IOException`, `FlinkException`, etc. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476136#comment-16476136 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188353074 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java --- @@ -79,15 +87,16 @@ // runtime fields, non-serializable, lazily initialized --- - private transient SpecificDatumWriter writer; - private transient SpecificDatumReader reader; + private transient GenericDatumWriter writer; + private transient GenericDatumReader reader; private transient DataOutputEncoder encoder; private transient DataInputDecoder decoder; - private transient SpecificData avroData; + private transient GenericData avroData; private transient Schema schema; + private final String schemaString; --- End diff -- As per the comments, the existing code orders config fields before runtime fields. Can you place the schema to match that pattern? > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476121#comment-16476121 ] ASF GitHub Bot commented on FLINK-9337: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/5995#discussion_r188337378 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; +import org.apache.flink.util.Preconditions; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificRecord; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.io.ObjectInputStream; + +/** + * Deserialization schema that deserializes from Avro binary format. + * + * @param type of record it produces + */ +public class AvroDeserializationSchema implements DeserializationSchema { --- End diff -- This should have a serial version UID. You can activate the respective inspections in IntelliJ to warn about such issues. > Implement AvroDeserializationSchema > --- > > Key: FLINK-9337 > URL: https://issues.apache.org/jira/browse/FLINK-9337 > Project: Flink > Issue Type: New Feature >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)