[jira] [Updated] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB

2018-05-15 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9373:
--
Description: Currently, when using RocksIterator we only use the 
_iterator.isValid()_ to check whether we have reached the end of the iterator. 
But that is not enough, if we refer to RocksDB's wiki 
https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should find 
that even if _iterator.isValid()=true_, there may also exist some internal 
error. A safer way to use the _RocksIterator_ is to always call the 
_iterator.status()_ to check the internal error of _RocksDB_. There is a case 
from user email seems to lost data because of this 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html
  (was: Currently, when using RocksIterator we only use the 
_iterator.isValid()_ to check whether we have reached the end of the iterator. 
But that is not enough, if we refer to RocksDB's wiki 
https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should find 
that _iterator.isValid()_ may also cause by a internal error. A safer way to 
use the _RocksIterator_ is to always call the _iterator.status()_ to check the 
internal error of _RocksDB_. There is a case from user email seems to lost data 
because of this 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html)

> Always call RocksIterator.status() to check the internal error of RocksDB
> -
>
> Key: FLINK-9373
> URL: https://issues.apache.org/jira/browse/FLINK-9373
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
>
> Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
> check whether we have reached the end of the iterator. But that is not 
> enough, if we refer to RocksDB's wiki 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should 
> find that even if _iterator.isValid()=true_, there may also exist some 
> internal error. A safer way to use the _RocksIterator_ is to always call the 
> _iterator.status()_ to check the internal error of _RocksDB_. There is a case 
> from user email seems to lost data because of this 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB

2018-05-15 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reopened FLINK-9373:
---

> Always call RocksIterator.status() to check the internal error of RocksDB
> -
>
> Key: FLINK-9373
> URL: https://issues.apache.org/jira/browse/FLINK-9373
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
>
> Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
> check whether we have reached the end of the iterator. But that is not 
> enough, if we refer to RocksDB's wiki 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should 
> find that _iterator.isValid()_ may also cause by a internal error. A safer 
> way to use the _RocksIterator_ is to always call the _iterator.status()_ to 
> check the internal error of _RocksDB_. There is a case from user email seems 
> to lost data because of this 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB

2018-05-15 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou closed FLINK-9373.
-
Resolution: Invalid

> Always call RocksIterator.status() to check the internal error of RocksDB
> -
>
> Key: FLINK-9373
> URL: https://issues.apache.org/jira/browse/FLINK-9373
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
>
> Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
> check whether we have reached the end of the iterator. But that is not 
> enough, if we refer to RocksDB's wiki 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should 
> find that _iterator.isValid()_ may also cause by a internal error. A safer 
> way to use the _RocksIterator_ is to always call the _iterator.status()_ to 
> check the internal error of _RocksDB_. There is a case from user email seems 
> to lost data because of this 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB

2018-05-15 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9373:
-

 Summary: Always call RocksIterator.status() to check the internal 
error of RocksDB
 Key: FLINK-9373
 URL: https://issues.apache.org/jira/browse/FLINK-9373
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
check whether we have reached the end of the iterator. But that is not enough, 
if we refer to RocksDB's wiki 
https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should find 
that _iterator.isValid()_ may also cause by a internal error. A safer way to 
use the _RocksIterator_ is to always call the _iterator.status()_ to check the 
internal error of _RocksDB_. There is a case from user email seems to lost data 
because of this 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9292) Remove TypeInfoParser

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476930#comment-16476930
 ] 

ASF GitHub Bot commented on FLINK-9292:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5970
  
cc @tzulitai 


> Remove TypeInfoParser
> -
>
> Key: FLINK-9292
> URL: https://issues.apache.org/jira/browse/FLINK-9292
> Project: Flink
>  Issue Type: Task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> The {{TypeInfoParser}} has been deprecated, in favor of the {{TypeHint}}.
> Because the TypeInfoParser is also not working correctly with respect to 
> classloading, we should remove it. Users still find the class, try to use it, 
> and run into problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)

2018-05-15 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5970
  
cc @tzulitai 


---


[jira] [Commented] (FLINK-9299) ProcessWindowFunction documentation Java examples have errors

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476927#comment-16476927
 ] 

ASF GitHub Bot commented on FLINK-9299:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6001
  
cc @tzulitai @fhueske 


> ProcessWindowFunction documentation Java examples have errors
> -
>
> Key: FLINK-9299
> URL: https://issues.apache.org/jira/browse/FLINK-9299
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.2
>Reporter: Ken Krugler
>Assignee: vinoyang
>Priority: Minor
>
> In looking at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation],
>  I noticed a few errors...
>  * "This allows to incrementally compute windows" should be "This allows it 
> to incrementally compute windows"
>  * DataStream input = ...; should be 
> DataStream> input = ...;
>  * The getResult() method needs to cast one of the accumulator values to a 
> double, if that's what it is going to return.
>  * MyProcessWindowFunction needs to extend, not implement 
> ProcessWindowFunction
>  * MyProcessWindowFunction needs to implement a process() method, not an 
> apply() method.
>  * The call to .timeWindow takes a Time parameter, not a window assigner.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6001: [FLINK-9299] ProcessWindowFunction documentation Java exa...

2018-05-15 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6001
  
cc @tzulitai @fhueske 


---


[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476908#comment-16476908
 ] 

ASF GitHub Bot commented on FLINK-8659:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188511258
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteri

[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476909#comment-16476909
 ] 

ASF GitHub Bot commented on FLINK-8659:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188512862
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteri

[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476905#comment-16476905
 ] 

ASF GitHub Bot commented on FLINK-8659:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188511081
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteri

[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476910#comment-16476910
 ] 

ASF GitHub Bot commented on FLINK-8659:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188512949
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteri

[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476907#comment-16476907
 ] 

ASF GitHub Bot commented on FLINK-8659:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188512804
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteri

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188511081
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+   env.setStateBackend(new RocksDBStateBackend(new 
MemoryStateBa

[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476906#comment-16476906
 ] 

ASF GitHub Bot commented on FLINK-8659:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188511280
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteri

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188512949
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+   env.setStateBackend(new RocksDBStateBackend(new 
MemoryStateBa

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188511258
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+   env.setStateBackend(new RocksDBStateBackend(new 
MemoryStateBa

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188512862
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+   env.setStateBackend(new RocksDBStateBackend(new 
MemoryStateBa

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188512804
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+   env.setStateBackend(new RocksDBStateBackend(new 
MemoryStateBa

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188511280
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+   env.setStateBackend(new RocksDBStateBackend(new 
MemoryStateBa

[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476884#comment-16476884
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

GitHub user makeyang opened a pull request:

https://github.com/apache/flink/pull/6019

[FLINK-9182]async checkpoints for timer service

## What is the purpose of the change

it is for async checkpoints for timer service
the whole idea is based on discussion in previous PR for FLINK-9182 in this 
link:https://github.com/apache/flink/pull/5908

## Brief change log

in sync part flat copy of the internal array of the priority queue
in async part build key group and write timer key group

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/makeyang/flink FLINK-9182-version2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6019.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6019


commit 82799922203bd6cb959c11336f71aee4def431d7
Author: makeyang 
Date:   2018-05-16T05:44:16Z

[FLINK-9182]async checkpoints for timer service
the whole idea is based on discussion on github: 
https://github.com/apache/flink/pull/5908
the idea is propesed by StefanRRichter as below:
"Second, I would probably suggest a simpler model for the async snapshots. 
You dropped the idea of making flat copies, but I wonder if this was premature. 
I can see that calling set.toArray(...) per keygroup could (maybe) turn out a 
bit slow because it has to potentially iterate and flatten linked entries. 
However, with async snapshots, we could get rid of the key-group partitioning 
of sets, and instead do a flat copy of the internal array of the priority 
queue. This would translate to just a single memcopy call internally, which is 
very efficient. In the async part, we can still partition the timers by 
key-group in a similar way as the copy-on-write state table does. This would 
avoid slowing down the event processing path (in fact improving it be unifying 
the sets) and also keep the approach very straight forward and less invasive."




> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose de

[GitHub] flink pull request #6019: [FLINK-9182]async checkpoints for timer service

2018-05-15 Thread makeyang
GitHub user makeyang opened a pull request:

https://github.com/apache/flink/pull/6019

[FLINK-9182]async checkpoints for timer service

## What is the purpose of the change

it is for async checkpoints for timer service
the whole idea is based on discussion in previous PR for FLINK-9182 in this 
link:https://github.com/apache/flink/pull/5908

## Brief change log

in sync part flat copy of the internal array of the priority queue
in async part build key group and write timer key group

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/makeyang/flink FLINK-9182-version2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6019.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6019


commit 82799922203bd6cb959c11336f71aee4def431d7
Author: makeyang 
Date:   2018-05-16T05:44:16Z

[FLINK-9182]async checkpoints for timer service
the whole idea is based on discussion on github: 
https://github.com/apache/flink/pull/5908
the idea is propesed by StefanRRichter as below:
"Second, I would probably suggest a simpler model for the async snapshots. 
You dropped the idea of making flat copies, but I wonder if this was premature. 
I can see that calling set.toArray(...) per keygroup could (maybe) turn out a 
bit slow because it has to potentially iterate and flatten linked entries. 
However, with async snapshots, we could get rid of the key-group partitioning 
of sets, and instead do a flat copy of the internal array of the priority 
queue. This would translate to just a single memcopy call internally, which is 
very efficient. In the async part, we can still partition the timers by 
key-group in a similar way as the copy-on-write state table does. This would 
avoid slowing down the event processing path (in fact improving it be unifying 
the sets) and also keep the approach very straight forward and less invasive."




---


[jira] [Commented] (FLINK-9354) print execution times for end-to-end tests

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476866#comment-16476866
 ] 

ASF GitHub Bot commented on FLINK-9354:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6008#discussion_r188507930
  
--- Diff: flink-end-to-end-tests/run-nightly-tests.sh ---
@@ -43,121 +43,151 @@ EXIT_CODE=0
 # printf 
"\n==\n"
 # printf "Running my fancy nightly end-to-end test\n"
 # printf 
"==\n"
+# start_timer
 # $END_TO_END_DIR/test-scripts/test_something_very_fancy.sh
 # EXIT_CODE=$?
+# end_timer
 # fi
 
 
 if [ $EXIT_CODE == 0 ]; then
 printf 
"\n==\n"
 printf "Running HA end-to-end test\n"
 printf 
"==\n"
+start_timer
 $END_TO_END_DIR/test-scripts/test_ha.sh
 EXIT_CODE=$?
+end_timer
 fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
   printf "Running Resuming Savepoint (file, async, no parallelism change) 
end-to-end test\n"
   printf 
"==\n"
+  start_timer
   STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
   EXIT_CODE=$?
+  end_timer
 fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
   printf "Running Resuming Savepoint (file, sync, no parallelism change) 
end-to-end test\n"
   printf 
"==\n"
+  start_timer
   STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
   EXIT_CODE=$?
+  end_timer
 fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
   printf "Running Resuming Savepoint (file, async, scale up) end-to-end 
test\n"
   printf 
"==\n"
+  start_timer
   STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4
   EXIT_CODE=$?
+  end_timer
 fi
--- End diff --

Perhaps we can refactor this whole block into a common base script for the 
nightly / pre-commit hook scripts.
The end_timer / start_timer functions should also be located there.


> print execution times for end-to-end tests
> --
>
> Key: FLINK-9354
> URL: https://issues.apache.org/jira/browse/FLINK-9354
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests, Travis
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> We need to modify the end-to-end scripts to include the time it takes for a 
> test to run.
> We currently don't have any clue how long a test actually runs for.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6008: [FLINK-9354][travis] Print execution times for nig...

2018-05-15 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6008#discussion_r188507930
  
--- Diff: flink-end-to-end-tests/run-nightly-tests.sh ---
@@ -43,121 +43,151 @@ EXIT_CODE=0
 # printf 
"\n==\n"
 # printf "Running my fancy nightly end-to-end test\n"
 # printf 
"==\n"
+# start_timer
 # $END_TO_END_DIR/test-scripts/test_something_very_fancy.sh
 # EXIT_CODE=$?
+# end_timer
 # fi
 
 
 if [ $EXIT_CODE == 0 ]; then
 printf 
"\n==\n"
 printf "Running HA end-to-end test\n"
 printf 
"==\n"
+start_timer
 $END_TO_END_DIR/test-scripts/test_ha.sh
 EXIT_CODE=$?
+end_timer
 fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
   printf "Running Resuming Savepoint (file, async, no parallelism change) 
end-to-end test\n"
   printf 
"==\n"
+  start_timer
   STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
   EXIT_CODE=$?
+  end_timer
 fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
   printf "Running Resuming Savepoint (file, sync, no parallelism change) 
end-to-end test\n"
   printf 
"==\n"
+  start_timer
   STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
   EXIT_CODE=$?
+  end_timer
 fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
   printf "Running Resuming Savepoint (file, async, scale up) end-to-end 
test\n"
   printf 
"==\n"
+  start_timer
   STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4
   EXIT_CODE=$?
+  end_timer
 fi
--- End diff --

Perhaps we can refactor this whole block into a common base script for the 
nightly / pre-commit hook scripts.
The end_timer / start_timer functions should also be located there.


---


[jira] [Commented] (FLINK-9354) print execution times for end-to-end tests

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476864#comment-16476864
 ] 

ASF GitHub Bot commented on FLINK-9354:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6008
  
I see these in the Travis logs:
```
flink-end-to-end-tests/run-pre-commit-tests.sh: line 94: start_timer: 
command not found
...
flink-end-to-end-tests/run-pre-commit-tests.sh: line 97: end_timer: command 
not found
```

I think you'll have to include `common.sh` in the nightly / pre-commit hook 
scripts.


> print execution times for end-to-end tests
> --
>
> Key: FLINK-9354
> URL: https://issues.apache.org/jira/browse/FLINK-9354
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests, Travis
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> We need to modify the end-to-end scripts to include the time it takes for a 
> test to run.
> We currently don't have any clue how long a test actually runs for.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6008: [FLINK-9354][travis] Print execution times for nightly E2...

2018-05-15 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6008
  
I see these in the Travis logs:
```
flink-end-to-end-tests/run-pre-commit-tests.sh: line 94: start_timer: 
command not found
...
flink-end-to-end-tests/run-pre-commit-tests.sh: line 97: end_timer: command 
not found
```

I think you'll have to include `common.sh` in the nightly / pre-commit hook 
scripts.


---


[GitHub] flink issue #6018: [FLINK-9372] Typo on Elasticsearch website link (elastic....

2018-05-15 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6018
  
Thanks for catching this @medcv!
Apparently elastic.io directs to a completely different website 😅.
Merging this ..


---


[jira] [Commented] (FLINK-9372) Typo on Elasticsearch website link (elastic.io --> elastic.co)

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476855#comment-16476855
 ] 

ASF GitHub Bot commented on FLINK-9372:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6018
  
Thanks for catching this @medcv!
Apparently elastic.io directs to a completely different website 😅.
Merging this ..


> Typo on Elasticsearch website link (elastic.io --> elastic.co)
> --
>
> Key: FLINK-9372
> URL: https://issues.apache.org/jira/browse/FLINK-9372
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, ElasticSearch Connector
>Affects Versions: 1.5.0, 1.4.1, 1.4.2, 1.5.1
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Minor
>
> Typo on website link in Elasticsearch Java Docs (elastic.io --> elastic.co)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-7814) Add BETWEEN expression to Table API

2018-05-15 Thread Ruidong Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ruidong Li reassigned FLINK-7814:
-

Assignee: Ruidong Li

> Add BETWEEN expression to Table API 
> 
>
> Key: FLINK-7814
> URL: https://issues.apache.org/jira/browse/FLINK-7814
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Ruidong Li
>Priority: Minor
>
> * The Table API does not have a BETWEEN expression. BETWEEN is quite handy 
> when defining join predicates for window joins.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6007: [FLINK-8518][Table API & SQL] Support DOW for EXTRACT

2018-05-15 Thread snuyanzin
Github user snuyanzin commented on the issue:

https://github.com/apache/flink/pull/6007
  
yes, done


---


[jira] [Commented] (FLINK-8518) Support DOW, EPOCH, DECADE for EXTRACT

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476801#comment-16476801
 ] 

ASF GitHub Bot commented on FLINK-8518:
---

Github user snuyanzin commented on the issue:

https://github.com/apache/flink/pull/6007
  
yes, done


> Support DOW, EPOCH, DECADE for EXTRACT
> --
>
> Key: FLINK-8518
> URL: https://issues.apache.org/jira/browse/FLINK-8518
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> We upgraded Calcite to version 1.15 in FLINK-7934. The EXTRACT method 
> supports more conversion targets. The targets DOW, EPOCH, DECADE should be 
> implemented and tested for different datatypes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476733#comment-16476733
 ] 

ASF GitHub Bot commented on FLINK-9008:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188486521
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.jobmanager.{JobManager, 
SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.metrics.MetricRegistryImpl
--- End diff --

It already separate a hotfix. We have two commits here.


> End-to-end test: Quickstarts
> 
>
> Key: FLINK-9008
> URL: https://issues.apache.org/jira/browse/FLINK-9008
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Blocker
> Fix For: 1.6.0
>
>
> We could add an end-to-end test which verifies Flink's quickstarts. It should 
> do the following:
> # create a new Flink project using the quickstarts archetype 
> # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library) 
> # run {{mvn clean package -Pbuild-jar}}
> # verify that no core dependencies are contained in the jar file
> # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-05-15 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188486521
  
--- Diff: 
flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.jobmanager.{JobManager, 
SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.metrics.MetricRegistryImpl
--- End diff --

It already separate a hotfix. We have two commits here.


---


[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476730#comment-16476730
 ] 

ASF GitHub Bot commented on FLINK-9008:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188486388
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.5-SNAPSHOT\
--- End diff --

Yes. Will change.


> End-to-end test: Quickstarts
> 
>
> Key: FLINK-9008
> URL: https://issues.apache.org/jira/browse/FLINK-9008
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Blocker
> Fix For: 1.6.0
>
>
> We could add an end-to-end test which verifies Flink's quickstarts. It should 
> do the following:
> # create a new Flink project using the quickstarts archetype 
> # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library) 
> # run {{mvn clean package -Pbuild-jar}}
> # verify that no core dependencies are contained in the jar file
> # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476731#comment-16476731
 ] 

ASF GitHub Bot commented on FLINK-9008:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188486404
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.5-SNAPSHOT\
+-DgroupId=org.apache.flink.quickstart  \
+-DartifactId=flink-java-project\
+-Dversion=0.1  \
+-Dpackage=org.apache.flink.quickstart  \
+-DinteractiveMode=false
+
+cd flink-java-project
+
+cp 
$CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+
+position=$(awk '// {print NR}' pom.xml | head -1)
+
+sed -i -e ''"$(($position + 1))"'i\
+\
+org.apache.flink\

+flink-connector-elasticsearch2_${scala.binary.version}\
+${flink.version}\
+' pom.xml
+
+sed -i -e 
"s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/"
 pom.xml
+
+mvn clean package -nsu
+
+cd target
+jar tvf flink-java-project-0.1.jar > contentsInJar.txt
+
+if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' 
&& \
+  `grep -c "org/apache/flink/streaming/experimental" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq 
'0' && \
+  `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq 
'0' ]]; then
+
+echo "Success: There are no flink core classes are contained in the 
jar."
+else
+echo "Failure: There are flink core classes are contained in the jar."
+exit 1
+fi
+
+if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c 
"org/apache/flink/quickstart/ElasticsearchStreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" 
contentsInJar.txt` -eq '0' ]]; then
+
+echo "Failure: Since ElasticsearchStreamingJob.class and other user 
classes are not included in the jar. "
+exit 1
+else
+echo "Success: ElasticsearchStreamingJob.class and other user classes 
are included in the jar."
+fi
+
+cd $CURRENT_DIR
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+mkdir -p $TEST_DATA_DIR
+

+ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
+
+curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz
+tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
+
+# start elasticsearch cluster
+nohup $ELASTICSEARCH_DIR/bin/elasticsearch &
+

+TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -c 
org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_PROGRAM_JAR
+
+touch $T

[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476732#comment-16476732
 ] 

ASF GitHub Bot commented on FLINK-9008:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188486423
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.5-SNAPSHOT\
+-DgroupId=org.apache.flink.quickstart  \
+-DartifactId=flink-java-project\
+-Dversion=0.1  \
+-Dpackage=org.apache.flink.quickstart  \
+-DinteractiveMode=false
+
+cd flink-java-project
+
+cp 
$CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+
+position=$(awk '// {print NR}' pom.xml | head -1)
+
+sed -i -e ''"$(($position + 1))"'i\
+\
+org.apache.flink\

+flink-connector-elasticsearch2_${scala.binary.version}\
+${flink.version}\
+' pom.xml
+
+sed -i -e 
"s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/"
 pom.xml
+
+mvn clean package -nsu
+
+cd target
+jar tvf flink-java-project-0.1.jar > contentsInJar.txt
+
+if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' 
&& \
+  `grep -c "org/apache/flink/streaming/experimental" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq 
'0' && \
+  `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq 
'0' ]]; then
+
+echo "Success: There are no flink core classes are contained in the 
jar."
+else
+echo "Failure: There are flink core classes are contained in the jar."
+exit 1
+fi
+
+if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c 
"org/apache/flink/quickstart/ElasticsearchStreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" 
contentsInJar.txt` -eq '0' ]]; then
+
+echo "Failure: Since ElasticsearchStreamingJob.class and other user 
classes are not included in the jar. "
+exit 1
+else
+echo "Success: ElasticsearchStreamingJob.class and other user classes 
are included in the jar."
+fi
+
+cd $CURRENT_DIR
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+mkdir -p $TEST_DATA_DIR
+

+ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
+
+curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz
+tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
+
+# start elasticsearch cluster
+nohup $ELASTICSEARCH_DIR/bin/elasticsearch &
+

+TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar
+
+# run the Flink job
--- End diff --

Will do.


> End-to-end test: Quickstarts
> 
>
> Key

[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-05-15 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188486423
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.5-SNAPSHOT\
+-DgroupId=org.apache.flink.quickstart  \
+-DartifactId=flink-java-project\
+-Dversion=0.1  \
+-Dpackage=org.apache.flink.quickstart  \
+-DinteractiveMode=false
+
+cd flink-java-project
+
+cp 
$CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+
+position=$(awk '// {print NR}' pom.xml | head -1)
+
+sed -i -e ''"$(($position + 1))"'i\
+\
+org.apache.flink\

+flink-connector-elasticsearch2_${scala.binary.version}\
+${flink.version}\
+' pom.xml
+
+sed -i -e 
"s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/"
 pom.xml
+
+mvn clean package -nsu
+
+cd target
+jar tvf flink-java-project-0.1.jar > contentsInJar.txt
+
+if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' 
&& \
+  `grep -c "org/apache/flink/streaming/experimental" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq 
'0' && \
+  `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq 
'0' ]]; then
+
+echo "Success: There are no flink core classes are contained in the 
jar."
+else
+echo "Failure: There are flink core classes are contained in the jar."
+exit 1
+fi
+
+if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c 
"org/apache/flink/quickstart/ElasticsearchStreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" 
contentsInJar.txt` -eq '0' ]]; then
+
+echo "Failure: Since ElasticsearchStreamingJob.class and other user 
classes are not included in the jar. "
+exit 1
+else
+echo "Success: ElasticsearchStreamingJob.class and other user classes 
are included in the jar."
+fi
+
+cd $CURRENT_DIR
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+mkdir -p $TEST_DATA_DIR
+

+ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
+
+curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz
+tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
+
+# start elasticsearch cluster
+nohup $ELASTICSEARCH_DIR/bin/elasticsearch &
+

+TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar
+
+# run the Flink job
--- End diff --

Will do.


---


[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-05-15 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188486404
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.5-SNAPSHOT\
+-DgroupId=org.apache.flink.quickstart  \
+-DartifactId=flink-java-project\
+-Dversion=0.1  \
+-Dpackage=org.apache.flink.quickstart  \
+-DinteractiveMode=false
+
+cd flink-java-project
+
+cp 
$CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+
+position=$(awk '// {print NR}' pom.xml | head -1)
+
+sed -i -e ''"$(($position + 1))"'i\
+\
+org.apache.flink\

+flink-connector-elasticsearch2_${scala.binary.version}\
+${flink.version}\
+' pom.xml
+
+sed -i -e 
"s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/"
 pom.xml
+
+mvn clean package -nsu
+
+cd target
+jar tvf flink-java-project-0.1.jar > contentsInJar.txt
+
+if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' 
&& \
+  `grep -c "org/apache/flink/streaming/experimental" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq 
'0' && \
+  `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq 
'0' ]]; then
+
+echo "Success: There are no flink core classes are contained in the 
jar."
+else
+echo "Failure: There are flink core classes are contained in the jar."
+exit 1
+fi
+
+if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c 
"org/apache/flink/quickstart/ElasticsearchStreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" 
contentsInJar.txt` -eq '0' ]]; then
+
+echo "Failure: Since ElasticsearchStreamingJob.class and other user 
classes are not included in the jar. "
+exit 1
+else
+echo "Success: ElasticsearchStreamingJob.class and other user classes 
are included in the jar."
+fi
+
+cd $CURRENT_DIR
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+mkdir -p $TEST_DATA_DIR
+

+ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
+
+curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz
+tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
+
+# start elasticsearch cluster
+nohup $ELASTICSEARCH_DIR/bin/elasticsearch &
+

+TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -c 
org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_PROGRAM_JAR
+
+touch $TEST_DATA_DIR/output
+
+curl 'localhost:9200/my-index/_search?q=*&pretty&size=21' > 
$TEST_DATA_DIR/output
+
+if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then
+echo "Quickstarts end to end test pass."
+else
+ 

[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-05-15 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188486388
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.5-SNAPSHOT\
--- End diff --

Yes. Will change.


---


[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476724#comment-16476724
 ] 

ASF GitHub Bot commented on FLINK-9008:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188486209
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.5-SNAPSHOT\
+-DgroupId=org.apache.flink.quickstart  \
+-DartifactId=flink-java-project\
+-Dversion=0.1  \
+-Dpackage=org.apache.flink.quickstart  \
+-DinteractiveMode=false
+
+cd flink-java-project
+
+cp 
$CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+
+position=$(awk '// {print NR}' pom.xml | head -1)
+
+sed -i -e ''"$(($position + 1))"'i\
+\
+org.apache.flink\

+flink-connector-elasticsearch2_${scala.binary.version}\
+${flink.version}\
+' pom.xml
+
+sed -i -e 
"s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/"
 pom.xml
+
+mvn clean package -nsu
+
+cd target
+jar tvf flink-java-project-0.1.jar > contentsInJar.txt
+
+if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' 
&& \
+  `grep -c "org/apache/flink/streaming/experimental" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq 
'0' && \
+  `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq 
'0' ]]; then
+
+echo "Success: There are no flink core classes are contained in the 
jar."
+else
+echo "Failure: There are flink core classes are contained in the jar."
+exit 1
+fi
+
+if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c 
"org/apache/flink/quickstart/ElasticsearchStreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" 
contentsInJar.txt` -eq '0' ]]; then
+
+echo "Failure: Since ElasticsearchStreamingJob.class and other user 
classes are not included in the jar. "
+exit 1
+else
+echo "Success: ElasticsearchStreamingJob.class and other user classes 
are included in the jar."
+fi
+
+cd $CURRENT_DIR
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+mkdir -p $TEST_DATA_DIR
+

+ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
+
+curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz
+tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
+
+# start elasticsearch cluster
+nohup $ELASTICSEARCH_DIR/bin/elasticsearch &
+

+TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -c 
org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_PROGRAM_JAR
+
+touch $T

[jira] [Commented] (FLINK-9008) End-to-end test: Quickstarts

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476725#comment-16476725
 ] 

ASF GitHub Bot commented on FLINK-9008:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188486224
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.5-SNAPSHOT\
+-DgroupId=org.apache.flink.quickstart  \
+-DartifactId=flink-java-project\
+-Dversion=0.1  \
+-Dpackage=org.apache.flink.quickstart  \
+-DinteractiveMode=false
+
+cd flink-java-project
+
+cp 
$CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+
+position=$(awk '// {print NR}' pom.xml | head -1)
+
+sed -i -e ''"$(($position + 1))"'i\
+\
+org.apache.flink\

+flink-connector-elasticsearch2_${scala.binary.version}\
+${flink.version}\
+' pom.xml
+
+sed -i -e 
"s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/"
 pom.xml
+
+mvn clean package -nsu
+
+cd target
+jar tvf flink-java-project-0.1.jar > contentsInJar.txt
+
+if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' 
&& \
+  `grep -c "org/apache/flink/streaming/experimental" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq 
'0' && \
+  `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq 
'0' ]]; then
+
+echo "Success: There are no flink core classes are contained in the 
jar."
+else
+echo "Failure: There are flink core classes are contained in the jar."
+exit 1
--- End diff --

Will do.



> End-to-end test: Quickstarts
> 
>
> Key: FLINK-9008
> URL: https://issues.apache.org/jira/browse/FLINK-9008
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Blocker
> Fix For: 1.6.0
>
>
> We could add an end-to-end test which verifies Flink's quickstarts. It should 
> do the following:
> # create a new Flink project using the quickstarts archetype 
> # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library) 
> # run {{mvn clean package -Pbuild-jar}}
> # verify that no core dependencies are contained in the jar file
> # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-05-15 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188486224
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.5-SNAPSHOT\
+-DgroupId=org.apache.flink.quickstart  \
+-DartifactId=flink-java-project\
+-Dversion=0.1  \
+-Dpackage=org.apache.flink.quickstart  \
+-DinteractiveMode=false
+
+cd flink-java-project
+
+cp 
$CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+
+position=$(awk '// {print NR}' pom.xml | head -1)
+
+sed -i -e ''"$(($position + 1))"'i\
+\
+org.apache.flink\

+flink-connector-elasticsearch2_${scala.binary.version}\
+${flink.version}\
+' pom.xml
+
+sed -i -e 
"s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/"
 pom.xml
+
+mvn clean package -nsu
+
+cd target
+jar tvf flink-java-project-0.1.jar > contentsInJar.txt
+
+if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' 
&& \
+  `grep -c "org/apache/flink/streaming/experimental" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq 
'0' && \
+  `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq 
'0' ]]; then
+
+echo "Success: There are no flink core classes are contained in the 
jar."
+else
+echo "Failure: There are flink core classes are contained in the jar."
+exit 1
--- End diff --

Will do.



---


[GitHub] flink pull request #5823: [FLINK-9008] [e2e] Implements quickstarts end to e...

2018-05-15 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5823#discussion_r188486209
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -0,0 +1,123 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# End to end test for quick starts test.
+
+CURRENT_DIR=$(cd "$( dirname "$0"  )" && pwd )
+
+cd $CURRENT_DIR
+
+mvn archetype:generate \
+-DarchetypeGroupId=org.apache.flink\
+-DarchetypeArtifactId=flink-quickstart-java\
+-DarchetypeVersion=1.5-SNAPSHOT\
+-DgroupId=org.apache.flink.quickstart  \
+-DartifactId=flink-java-project\
+-Dversion=0.1  \
+-Dpackage=org.apache.flink.quickstart  \
+-DinteractiveMode=false
+
+cd flink-java-project
+
+cp 
$CURRENT_DIR/../flink-quickstart-test/src/main/java/org/apache/flink/quickstart/ElasticsearchStreamingJob.java
 $CURRENT_DIR/flink-java-project/src/main/java/org/apache/flink/quickstart/
+
+position=$(awk '// {print NR}' pom.xml | head -1)
+
+sed -i -e ''"$(($position + 1))"'i\
+\
+org.apache.flink\

+flink-connector-elasticsearch2_${scala.binary.version}\
+${flink.version}\
+' pom.xml
+
+sed -i -e 
"s/org.apache.flink.quickstart.StreamingJob/org.apache.flink.quickstart.ElasticsearchStreamingJob/"
 pom.xml
+
+mvn clean package -nsu
+
+cd target
+jar tvf flink-java-project-0.1.jar > contentsInJar.txt
+
+if [[ `grep -c "org/apache/flink/api/java" contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/api" contentsInJar.txt` -eq '0' 
&& \
+  `grep -c "org/apache/flink/streaming/experimental" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/runtime" contentsInJar.txt` -eq 
'0' && \
+  `grep -c "org/apache/flink/streaming/util" contentsInJar.txt` -eq 
'0' ]]; then
+
+echo "Success: There are no flink core classes are contained in the 
jar."
+else
+echo "Failure: There are flink core classes are contained in the jar."
+exit 1
+fi
+
+if [[ `grep -c "org/apache/flink/quickstart/StreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c 
"org/apache/flink/quickstart/ElasticsearchStreamingJob.class" 
contentsInJar.txt` -eq '0' && \
+  `grep -c "org/apache/flink/streaming/connectors/elasticsearch2" 
contentsInJar.txt` -eq '0' ]]; then
+
+echo "Failure: Since ElasticsearchStreamingJob.class and other user 
classes are not included in the jar. "
+exit 1
+else
+echo "Success: ElasticsearchStreamingJob.class and other user classes 
are included in the jar."
+fi
+
+cd $CURRENT_DIR
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+mkdir -p $TEST_DATA_DIR
+

+ELASTICSEARCH_URL="https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
+
+curl "$ELASTICSEARCH_URL" > $TEST_DATA_DIR/elasticsearch.tar.gz
+tar xzf $TEST_DATA_DIR/elasticsearch.tar.gz -C $TEST_DATA_DIR/
+ELASTICSEARCH_DIR=$TEST_DATA_DIR/elasticsearch-2.3.5
+
+# start elasticsearch cluster
+nohup $ELASTICSEARCH_DIR/bin/elasticsearch &
+

+TEST_PROGRAM_JAR=$CURRENT_DIR/flink-java-project/target/flink-java-project-0.1.jar
+
+# run the Flink job
+$FLINK_DIR/bin/flink run -c 
org.apache.flink.quickstart.ElasticsearchStreamingJob $TEST_PROGRAM_JAR
+
+touch $TEST_DATA_DIR/output
+
+curl 'localhost:9200/my-index/_search?q=*&pretty&size=21' > 
$TEST_DATA_DIR/output
+
+if [ -n "$(grep '\"total\" : 21' $TEST_DATA_DIR/output)" ]; then
+echo "Quickstarts end to end test pass."
+else
+ 

[jira] [Commented] (FLINK-9219) Add support for OpenGIS features in Table & SQL API

2018-05-15 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476720#comment-16476720
 ] 

Xingcan Cui commented on FLINK-9219:


Thanks for the feedback [~twalthr]!

The majority of your comments make sense to me. However, about using the 
Calcite type or {{com.esri.core.geometry.Geometry}}, I still can't get your 
point. I wonder if you could explain a little bit more about that.

Thanks, Xingcan


> Add support for OpenGIS features in Table & SQL API
> ---
>
> Key: FLINK-9219
> URL: https://issues.apache.org/jira/browse/FLINK-9219
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> CALCITE-1968 added core functionality for handling 
> spatial/geographical/geometry data. It should not be too hard to expose these 
> features also in Flink's Table & SQL API. We would need a new {{GEOMETRY}} 
> data type and connect the function APIs.
> Right now the following functions are supported by Calcite:
> {code}
> ST_AsText, ST_AsWKT, ST_Boundary, ST_Buffer, ST_Contains, 
> ST_ContainsProperly, ST_Crosses, ST_Disjoint, ST_Distance, ST_DWithin, 
> ST_Envelope, ST_EnvelopesIntersect, ST_Equals, ST_GeometryType, 
> ST_GeometryTypeCode, ST_GeomFromText, ST_Intersects, ST_Is3D, 
> ST_LineFromText, ST_MakeLine, ST_MakePoint, ST_MLineFromText, 
> ST_MPointFromText, ST_MPolyFromText, ST_Overlaps, ST_Point, ST_PointFromText, 
> ST_PolyFromText, ST_SetSRID, ST_Touches, ST_Transform, ST_Union, ST_Within, 
> ST_Z
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476706#comment-16476706
 ] 

ASF GitHub Bot commented on FLINK-8944:
---

Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188481796
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -181,6 +181,25 @@ public static Properties 
replaceDeprecatedProducerKeys(Properties configProps) {
return configProps;
}
 
+   public static Properties replaceDeprecatedConsumerKeys(Properties 
configProps) {
+   if 
(configProps.containsKey(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE)) 
{
--- End diff --

Minor: this could be generalized by iterating over a map of oldkey -> 
newkey and I would also suggest to log a warning for deprecated keys


> Use ListShards for shard discovery in the flink kinesis connector
> -
>
> Key: FLINK-8944
> URL: https://issues.apache.org/jira/browse/FLINK-8944
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Currently the DescribeStream AWS API used to get list of shards is has a 
> restricted rate limits on AWS. (5 requests per sec per account). This is 
> problematic when running multiple flink jobs all on same account since each 
> subtasks calls the Describe Stream. Changing this to ListShards will provide 
> more flexibility on rate limits as ListShards has a 100 requests per second 
> per data stream limits.
> More details on the mailing list. https://goo.gl/mRXjKh



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188481796
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 ---
@@ -181,6 +181,25 @@ public static Properties 
replaceDeprecatedProducerKeys(Properties configProps) {
return configProps;
}
 
+   public static Properties replaceDeprecatedConsumerKeys(Properties 
configProps) {
+   if 
(configProps.containsKey(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE)) 
{
--- End diff --

Minor: this could be generalized by iterating over a map of oldkey -> 
newkey and I would also suggest to log a warning for deprecated keys


---


[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476704#comment-16476704
 ] 

ASF GitHub Bot commented on FLINK-8944:
---

Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188481562
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -65,14 +65,14 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The date format of initial timestamp to start reading Kinesis 
stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
-   /** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

I have deprecated some old properties and added the new ones. This is my 
first time using the @deprecated annotations. Do let me know if there are 
better ways of doing this. I used sample PRs like https://goo.gl/LWcrp2 for 
these changes. 





> Use ListShards for shard discovery in the flink kinesis connector
> -
>
> Key: FLINK-8944
> URL: https://issues.apache.org/jira/browse/FLINK-8944
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Currently the DescribeStream AWS API used to get list of shards is has a 
> restricted rate limits on AWS. (5 requests per sec per account). This is 
> problematic when running multiple flink jobs all on same account since each 
> subtasks calls the Describe Stream. Changing this to ListShards will provide 
> more flexibility on rate limits as ListShards has a 100 requests per second 
> per data stream limits.
> More details on the mailing list. https://goo.gl/mRXjKh



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188481562
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -65,14 +65,14 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The date format of initial timestamp to start reading Kinesis 
stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
-   /** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

I have deprecated some old properties and added the new ones. This is my 
first time using the @deprecated annotations. Do let me know if there are 
better ways of doing this. I used sample PRs like https://goo.gl/LWcrp2 for 
these changes. 





---


[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476701#comment-16476701
 ] 

ASF GitHub Bot commented on FLINK-8944:
---

Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188481158
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -97,8 +228,7 @@ public void testClientConfigOverride() {
 
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, 
"kinesisClient");
ClientConfiguration clientConfiguration = 
Whitebox.getInternalState(kinesisClient,
-   "clientConfiguration");
+   "clientConfiguration");
assertEquals(, clientConfiguration.getSocketTimeout());
}
-
--- End diff --

Done. Sorry for the inconvenience.


> Use ListShards for shard discovery in the flink kinesis connector
> -
>
> Key: FLINK-8944
> URL: https://issues.apache.org/jira/browse/FLINK-8944
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Currently the DescribeStream AWS API used to get list of shards is has a 
> restricted rate limits on AWS. (5 requests per sec per account). This is 
> problematic when running multiple flink jobs all on same account since each 
> subtasks calls the Describe Stream. Changing this to ListShards will provide 
> more flexibility on rate limits as ListShards has a 100 requests per second 
> per data stream limits.
> More details on the mailing list. https://goo.gl/mRXjKh



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476700#comment-16476700
 ] 

ASF GitHub Bot commented on FLINK-8944:
---

Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188481152
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -97,8 +228,7 @@ public void testClientConfigOverride() {
 
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, 
"kinesisClient");
ClientConfiguration clientConfiguration = 
Whitebox.getInternalState(kinesisClient,
-   "clientConfiguration");
+   "clientConfiguration");
--- End diff --

Done. Sorry for the inconvenience.


> Use ListShards for shard discovery in the flink kinesis connector
> -
>
> Key: FLINK-8944
> URL: https://issues.apache.org/jira/browse/FLINK-8944
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Currently the DescribeStream AWS API used to get list of shards is has a 
> restricted rate limits on AWS. (5 requests per sec per account). This is 
> problematic when running multiple flink jobs all on same account since each 
> subtasks calls the Describe Stream. Changing this to ListShards will provide 
> more flexibility on rate limits as ListShards has a 100 requests per second 
> per data stream limits.
> More details on the mailing list. https://goo.gl/mRXjKh



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188481158
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -97,8 +228,7 @@ public void testClientConfigOverride() {
 
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, 
"kinesisClient");
ClientConfiguration clientConfiguration = 
Whitebox.getInternalState(kinesisClient,
-   "clientConfiguration");
+   "clientConfiguration");
assertEquals(, clientConfiguration.getSocketTimeout());
}
-
--- End diff --

Done. Sorry for the inconvenience.


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188481152
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -97,8 +228,7 @@ public void testClientConfigOverride() {
 
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, 
"kinesisClient");
ClientConfiguration clientConfiguration = 
Whitebox.getInternalState(kinesisClient,
-   "clientConfiguration");
+   "clientConfiguration");
--- End diff --

Done. Sorry for the inconvenience.


---


[jira] [Commented] (FLINK-9372) Typo on Elasticsearch website link (elastic.io --> elastic.co)

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476535#comment-16476535
 ] 

ASF GitHub Bot commented on FLINK-9372:
---

GitHub user medcv opened a pull request:

https://github.com/apache/flink/pull/6018

[FLINK-9372] Typo on Elasticsearch website link (elastic.io --> elastic.co)

## What is the purpose of the change

Typo on Elasticsearch website link elastic.io --> elastic.co

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/medcv/flink FLINK-9372

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6018.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6018


commit 4e42304dce4bfbcab964548e4f153dc8c212b569
Author: Yadan.JS 
Date:   2018-05-15T22:03:22Z

[FLINK-9372] Typo on Elasticsearch website link




> Typo on Elasticsearch website link (elastic.io --> elastic.co)
> --
>
> Key: FLINK-9372
> URL: https://issues.apache.org/jira/browse/FLINK-9372
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, ElasticSearch Connector
>Affects Versions: 1.5.0, 1.4.1, 1.4.2, 1.5.1
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Minor
>
> Typo on website link in Elasticsearch Java Docs (elastic.io --> elastic.co)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6018: [FLINK-9372] Typo on Elasticsearch website link (e...

2018-05-15 Thread medcv
GitHub user medcv opened a pull request:

https://github.com/apache/flink/pull/6018

[FLINK-9372] Typo on Elasticsearch website link (elastic.io --> elastic.co)

## What is the purpose of the change

Typo on Elasticsearch website link elastic.io --> elastic.co

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/medcv/flink FLINK-9372

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6018.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6018


commit 4e42304dce4bfbcab964548e4f153dc8c212b569
Author: Yadan.JS 
Date:   2018-05-15T22:03:22Z

[FLINK-9372] Typo on Elasticsearch website link




---


[jira] [Created] (FLINK-9372) Typo on Elasticsearch website link (elastic.io --> elastic.co)

2018-05-15 Thread Yazdan Shirvany (JIRA)
Yazdan Shirvany created FLINK-9372:
--

 Summary: Typo on Elasticsearch website link (elastic.io --> 
elastic.co)
 Key: FLINK-9372
 URL: https://issues.apache.org/jira/browse/FLINK-9372
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, ElasticSearch Connector
Affects Versions: 1.4.2, 1.4.1, 1.5.0, 1.5.1
Reporter: Yazdan Shirvany
Assignee: Yazdan Shirvany


Typo on website link in Elasticsearch Java Docs (elastic.io --> elastic.co)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9371) High Availability JobManager Registration Failure

2018-05-15 Thread Jason Kania (JIRA)
Jason Kania created FLINK-9371:
--

 Summary: High Availability JobManager Registration Failure
 Key: FLINK-9371
 URL: https://issues.apache.org/jira/browse/FLINK-9371
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.4.2
Reporter: Jason Kania


The following error is happening intermittently on an 3 node cluster with 2 Job 
Managers configured in HA mode. When this happens, the two JobManager instances 
are associated with one another.

2018-05-15 19:00:06,400 INFO  
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  
- Trying to associate with JobManager leader 
akka.tcp://flink@aaa-1:5/user/jobmanager
2018-05-15 19:00:06,404 WARN  org.apache.flink.runtime.jobmanager.JobManager
    - Discard message 
LeaderSessionMessage(0bbe70c4-2642-4a08-912f-6cc09646281f,RegisterResourceManager
 akka://flink/user/resourcemanager-d6567c5d-85f4-4b18-8eac-cf9725d076a5) 
because there is currently no valid leader id known.
2018-05-15 19:00:16,418 ERROR 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  
- Resource manager could not register at JobManager
akka.pattern.AskTimeoutException: Ask timed out on 
[ActorSelection[Anchor(akka://flink/), Path(/user/jobmanager)]] after [1 
ms]. Sender[null] sent message of type 
"org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage".
    at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
    at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
    at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
    at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
    at 
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
    at 
akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
    at 
akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
    at 
akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
    at java.lang.Thread.run(Thread.java:748)

 

Sometimes the following type of log also comes out following the previous log:

2018-05-15 19:13:47,525 WARN  
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager  
- Discard message 
LeaderSessionMessage(5cab29b9-10d3-4b25-b934-f06b82be15b5,TriggerRegistrationAtJobManager
 akka.tcp://flink@aaa-1:5/user/jobmanager) because the expected leader 
session ID 61075587-51da-4e58-ac4f-9ea118ccdde9 did not equal the received 
leader session ID 5cab29b9-10d3-4b25-b934-f06b82be15b5.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5995
  
Also as for the package name or place where to put it, I don't feel 
competent to suggest a place, therefore will be happy to apply your suggestion.


---


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476320#comment-16476320
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5995
  
Also as for the package name or place where to put it, I don't feel 
competent to suggest a place, therefore will be happy to apply your suggestion.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5995
  
As for the snapshot binary data, I do understand that it should be created 
with appropriate flink version (in this case in theory with flink 1.3) and I've 
tried really hard to do so until I found out that this test is incompatible 
with 1.3 and the data could not be generated with flink 1.3 Later found out the 
comment to the test class that also states so: 

> Important: Since Avro itself broke class compatibility between 
1.7.7 (used in Flink 1.3) 

> * and 1.8.2 (used in Flink 1.4), the Avro by Pojo compatibility is broken 
through Avro already.
> * This test only tests that the Avro serializer change (switching from 
Pojo to Avro for Avro types)
> * works properly.

Also the commented code does not compile with flink 1.3(but this is a minor 
thing)

Data serialized with version of avro used in flink 1.3 (1.7.7) is not 
binary compatible with avro 1.8.2 (in flink 1.4+), due to changes how 
SpecificFixed is constructed.

Therefore how I regenerated this snapshot data is that I run the commented 
code on current branch. That is why I also changed few descriptions to that 
test as it test compatibility of `PojoSerializer` with `AvroSerializer` rather 
than binary backwards compatibility. 

Nevertheless I am more than happy to hear any comments on that. 


---


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476314#comment-16476314
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5995
  
As for the snapshot binary data, I do understand that it should be created 
with appropriate flink version (in this case in theory with flink 1.3) and I've 
tried really hard to do so until I found out that this test is incompatible 
with 1.3 and the data could not be generated with flink 1.3 Later found out the 
comment to the test class that also states so: 

> Important: Since Avro itself broke class compatibility between 
1.7.7 (used in Flink 1.3) 

> * and 1.8.2 (used in Flink 1.4), the Avro by Pojo compatibility is broken 
through Avro already.
> * This test only tests that the Avro serializer change (switching from 
Pojo to Avro for Avro types)
> * works properly.

Also the commented code does not compile with flink 1.3(but this is a minor 
thing)

Data serialized with version of avro used in flink 1.3 (1.7.7) is not 
binary compatible with avro 1.8.2 (in flink 1.4+), due to changes how 
SpecificFixed is constructed.

Therefore how I regenerated this snapshot data is that I run the commented 
code on current branch. That is why I also changed few descriptions to that 
test as it test compatibility of `PojoSerializer` with `AvroSerializer` rather 
than binary backwards compatibility. 

Nevertheless I am more than happy to hear any comments on that. 


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476299#comment-16476299
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188386920
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -275,9 +304,19 @@ else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
//  Utilities
// 

 
+   private static boolean isGenericRecord(Class type) {
+   return !SpecificRecord.class.isAssignableFrom(type) &&
+   GenericRecord.class.isAssignableFrom(type);
+   }
+
@Override
public TypeSerializer duplicate() {
-   return new AvroSerializer<>(type);
+   if (schemaString != null) {
+   return new AvroSerializer<>(type, new 
Schema.Parser().parse(schemaString));
--- End diff --

Didn't think it through well. Thought we need to create a deep copy of the 
schema, but as it is stateless I think we can just pass the schema. My mistake. 
Correct me if I am wrong. 


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188386920
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -275,9 +304,19 @@ else if (configSnapshot instanceof 
AvroSerializerConfigSnapshot) {
//  Utilities
// 

 
+   private static boolean isGenericRecord(Class type) {
+   return !SpecificRecord.class.isAssignableFrom(type) &&
+   GenericRecord.class.isAssignableFrom(type);
+   }
+
@Override
public TypeSerializer duplicate() {
-   return new AvroSerializer<>(type);
+   if (schemaString != null) {
+   return new AvroSerializer<>(type, new 
Schema.Parser().parse(schemaString));
--- End diff --

Didn't think it through well. Thought we need to create a deep copy of the 
schema, but as it is stateless I think we can just pass the schema. My mistake. 
Correct me if I am wrong. 


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188386286
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private transient GenericDatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private transient MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data.
+*/
+   private transient Decoder decoder;
+
+   /**
+* Avro schema for the reader.
+*/
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   this.reader = reader;
+   if (reader != null) {
+  

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476291#comment-16476291
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188386286
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private transient GenericDatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private transient MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data.
+*/
+   private transient Decoder decoder;
+
+   /**
+* Avro schema for the reader.
+*/
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordCla

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476283#comment-16476283
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188385527
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private transient GenericDatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private transient MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data.
+*/
+   private transient Decoder decoder;
+
+   /**
+* Avro schema for the reader.
+*/
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordCla

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188385527
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private transient GenericDatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private transient MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data.
+*/
+   private transient Decoder decoder;
+
+   /**
+* Avro schema for the reader.
+*/
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   this.reader = reader;
+   if (reader != null) {
+  

[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476260#comment-16476260
 ] 

ASF GitHub Bot commented on FLINK-8944:
---

Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188379050
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -97,8 +228,7 @@ public void testClientConfigOverride() {
 
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, 
"kinesisClient");
ClientConfiguration clientConfiguration = 
Whitebox.getInternalState(kinesisClient,
-   "clientConfiguration");
+   "clientConfiguration");
assertEquals(, clientConfiguration.getSocketTimeout());
}
-
--- End diff --

remove unnecessary change


> Use ListShards for shard discovery in the flink kinesis connector
> -
>
> Key: FLINK-8944
> URL: https://issues.apache.org/jira/browse/FLINK-8944
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Currently the DescribeStream AWS API used to get list of shards is has a 
> restricted rate limits on AWS. (5 requests per sec per account). This is 
> problematic when running multiple flink jobs all on same account since each 
> subtasks calls the Describe Stream. Changing this to ListShards will provide 
> more flexibility on rate limits as ListShards has a 100 requests per second 
> per data stream limits.
> More details on the mailing list. https://goo.gl/mRXjKh



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476257#comment-16476257
 ] 

ASF GitHub Bot commented on FLINK-8944:
---

Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188378986
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -97,8 +228,7 @@ public void testClientConfigOverride() {
 
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, 
"kinesisClient");
ClientConfiguration clientConfiguration = 
Whitebox.getInternalState(kinesisClient,
-   "clientConfiguration");
+   "clientConfiguration");
--- End diff --

remove unnecessary change


> Use ListShards for shard discovery in the flink kinesis connector
> -
>
> Key: FLINK-8944
> URL: https://issues.apache.org/jira/browse/FLINK-8944
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Currently the DescribeStream AWS API used to get list of shards is has a 
> restricted rate limits on AWS. (5 requests per sec per account). This is 
> problematic when running multiple flink jobs all on same account since each 
> subtasks calls the Describe Stream. Changing this to ListShards will provide 
> more flexibility on rate limits as ListShards has a 100 requests per second 
> per data stream limits.
> More details on the mailing list. https://goo.gl/mRXjKh



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188378986
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -97,8 +228,7 @@ public void testClientConfigOverride() {
 
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, 
"kinesisClient");
ClientConfiguration clientConfiguration = 
Whitebox.getInternalState(kinesisClient,
-   "clientConfiguration");
+   "clientConfiguration");
--- End diff --

remove unnecessary change


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188379050
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -97,8 +228,7 @@ public void testClientConfigOverride() {
 
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, 
"kinesisClient");
ClientConfiguration clientConfiguration = 
Whitebox.getInternalState(kinesisClient,
-   "clientConfiguration");
+   "clientConfiguration");
assertEquals(, clientConfiguration.getSocketTimeout());
}
-
--- End diff --

remove unnecessary change


---


[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476252#comment-16476252
 ] 

ASF GitHub Bot commented on FLINK-8944:
---

Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188378324
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -65,14 +65,14 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The date format of initial timestamp to start reading Kinesis 
stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
-   /** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

I think that this should be one unit of work.


> Use ListShards for shard discovery in the flink kinesis connector
> -
>
> Key: FLINK-8944
> URL: https://issues.apache.org/jira/browse/FLINK-8944
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Currently the DescribeStream AWS API used to get list of shards is has a 
> restricted rate limits on AWS. (5 requests per sec per account). This is 
> problematic when running multiple flink jobs all on same account since each 
> subtasks calls the Describe Stream. Changing this to ListShards will provide 
> more flexibility on rate limits as ListShards has a 100 requests per second 
> per data stream limits.
> More details on the mailing list. https://goo.gl/mRXjKh



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188378324
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -65,14 +65,14 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The date format of initial timestamp to start reading Kinesis 
stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
-   /** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

I think that this should be one unit of work.


---


[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476243#comment-16476243
 ] 

ASF GitHub Bot commented on FLINK-8944:
---

Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188377363
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -65,14 +65,14 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The date format of initial timestamp to start reading Kinesis 
stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
-   /** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

For the time being, I will keep the older names and make the changes to 
deprecate the property names in the followup PR. I am not sure what the policy 
is with respect to changes size. I feel breaking this up to 2 different PRs 
will make it easier to review. Let me know if you feel otherwise. I will pull 
in those changes into this PR then. 


> Use ListShards for shard discovery in the flink kinesis connector
> -
>
> Key: FLINK-8944
> URL: https://issues.apache.org/jira/browse/FLINK-8944
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Currently the DescribeStream AWS API used to get list of shards is has a 
> restricted rate limits on AWS. (5 requests per sec per account). This is 
> problematic when running multiple flink jobs all on same account since each 
> subtasks calls the Describe Stream. Changing this to ListShards will provide 
> more flexibility on rate limits as ListShards has a 100 requests per second 
> per data stream limits.
> More details on the mailing list. https://goo.gl/mRXjKh



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476244#comment-16476244
 ] 

ASF GitHub Bot commented on FLINK-8944:
---

Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188377376
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -382,50 +386,62 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
 * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
-   private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
-   final DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
-   describeStreamRequest.setStreamName(streamName);
-   describeStreamRequest.setExclusiveStartShardId(startShardId);
+   private ListShardsResult listShards(String streamName, @Nullable String 
startShardId,
+   

@Nullable String startNextToken)
+   throws InterruptedException {
+   final ListShardsRequest listShardsRequest = new 
ListShardsRequest();
+   if (startNextToken == null) {
+   
listShardsRequest.setExclusiveStartShardId(startShardId);
+   listShardsRequest.setStreamName(streamName);
+   } else {
+   // Note the nextToken returned by AWS expires within 
300 sec.
+   listShardsRequest.setNextToken(startNextToken);
+   }
 
-   DescribeStreamResult describeStreamResult = null;
+   ListShardsResult listShardsResults = null;
 
-   // Call DescribeStream, with full-jitter backoff (if we get 
LimitExceededException).
+   // Call ListShards, with full-jitter backoff (if we get 
LimitExceededException).
int attemptCount = 0;
-   while (describeStreamResult == null) { // retry until we get a 
result
+   // List Shards returns just the first 1000 shard entries. Make 
sure that all entries
+   // are taken up.
+   while (listShardsResults == null) { // retry until we get a 
result
try {
-   describeStreamResult = 
kinesisClient.describeStream(describeStreamRequest);
+   listShardsResults = 
kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = fullJitterBackoff(
-   describeStreamBaseBackoffMillis, 
describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
-   LOG.warn("Got LimitExceededException when 
describing stream " + streamName + ". Backing off for "
-   + backoffMillis + " millis.");
+   listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+   LOG.warn("Got LimitExceededException 
when listing shards from stream " + streamName
+   + ". 
Backing off for " + backoffMillis + " millis.");
Thread.sleep(backoffMillis);
-   } catch (ResourceNotFoundException re) {
-   throw new RuntimeException("Error while getting 
stream details", re);
-   }
-   }
-
-   String streamStatus = 
describeStreamResult.getStreamDescription().getStreamStatus();
-   if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || 
streamStatus.equals(StreamStatus.UPDATING.toString( {
-   if (LOG.isWarnEnabled()) {
-   LOG.warn("The status of stream " + streamName + 
" is " + streamStatus + "; result of the current " +
-   "describeStream operation will not 
contain any shard information.");
+   } catch (ResourceInUseException reInUse) {
+   if (LOG.isWarnEnabled()) {
+   // List Shards will throw an exception 
if stream in not in active state. Will return
+   LOG.warn("The stream is currently not 
in acti

[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476246#comment-16476246
 ] 

ASF GitHub Bot commented on FLINK-8944:
---

Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188377415
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -353,19 +355,21 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
private List getShardsOfStream(String streamName, 
@Nullable String lastSeenShardId) throws InterruptedException {
List shardsOfStream = new ArrayList<>();
 
-   DescribeStreamResult describeStreamResult;
+   // List Shards returns just the first 1000 shard entries. In 
order to read the entire stream,
+   // we need to use the returned nextToken to get additional 
shards.
+   ListShardsResult listShardsResult;
+   String startShardToken = null;
do {
-   describeStreamResult = describeStream(streamName, 
lastSeenShardId);
-
-   List shards = 
describeStreamResult.getStreamDescription().getShards();
+   listShardsResult = listShards(streamName, 
lastSeenShardId, startShardToken);
--- End diff --

Thanks for catching this. This is something which I have fixed by clearing 
shardsOfStream to ensure we return an empty shardsOfStream in case of 
ExpiredTokenException.
I intended the following behavior for this: In case there is an unlikely 
case of expired next token, then we will just return an empty ShardsOfStream. 
This should be alright since in case there are no new shards discovered, by 
default it ends up returning an empty shardsOfStream. 


> Use ListShards for shard discovery in the flink kinesis connector
> -
>
> Key: FLINK-8944
> URL: https://issues.apache.org/jira/browse/FLINK-8944
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Currently the DescribeStream AWS API used to get list of shards is has a 
> restricted rate limits on AWS. (5 requests per sec per account). This is 
> problematic when running multiple flink jobs all on same account since each 
> subtasks calls the Describe Stream. Changing this to ListShards will provide 
> more flexibility on rate limits as ListShards has a 100 requests per second 
> per data stream limits.
> More details on the mailing list. https://goo.gl/mRXjKh



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476245#comment-16476245
 ] 

ASF GitHub Bot commented on FLINK-8944:
---

Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188377393
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -382,50 +386,62 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
 * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
-   private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
-   final DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
-   describeStreamRequest.setStreamName(streamName);
-   describeStreamRequest.setExclusiveStartShardId(startShardId);
+   private ListShardsResult listShards(String streamName, @Nullable String 
startShardId,
+   

@Nullable String startNextToken)
+   throws InterruptedException {
+   final ListShardsRequest listShardsRequest = new 
ListShardsRequest();
+   if (startNextToken == null) {
+   
listShardsRequest.setExclusiveStartShardId(startShardId);
+   listShardsRequest.setStreamName(streamName);
+   } else {
+   // Note the nextToken returned by AWS expires within 
300 sec.
+   listShardsRequest.setNextToken(startNextToken);
+   }
 
-   DescribeStreamResult describeStreamResult = null;
+   ListShardsResult listShardsResults = null;
 
-   // Call DescribeStream, with full-jitter backoff (if we get 
LimitExceededException).
+   // Call ListShards, with full-jitter backoff (if we get 
LimitExceededException).
int attemptCount = 0;
-   while (describeStreamResult == null) { // retry until we get a 
result
+   // List Shards returns just the first 1000 shard entries. Make 
sure that all entries
+   // are taken up.
+   while (listShardsResults == null) { // retry until we get a 
result
try {
-   describeStreamResult = 
kinesisClient.describeStream(describeStreamRequest);
+   listShardsResults = 
kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = fullJitterBackoff(
-   describeStreamBaseBackoffMillis, 
describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
-   LOG.warn("Got LimitExceededException when 
describing stream " + streamName + ". Backing off for "
-   + backoffMillis + " millis.");
+   listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+   LOG.warn("Got LimitExceededException 
when listing shards from stream " + streamName
+   + ". 
Backing off for " + backoffMillis + " millis.");
Thread.sleep(backoffMillis);
-   } catch (ResourceNotFoundException re) {
-   throw new RuntimeException("Error while getting 
stream details", re);
-   }
-   }
-
-   String streamStatus = 
describeStreamResult.getStreamDescription().getStreamStatus();
-   if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || 
streamStatus.equals(StreamStatus.UPDATING.toString( {
-   if (LOG.isWarnEnabled()) {
-   LOG.warn("The status of stream " + streamName + 
" is " + streamStatus + "; result of the current " +
-   "describeStream operation will not 
contain any shard information.");
+   } catch (ResourceInUseException reInUse) {
+   if (LOG.isWarnEnabled()) {
+   // List Shards will throw an exception 
if stream in not in active state. Will return
+   LOG.warn("The stream is currently not 
in acti

[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476247#comment-16476247
 ] 

ASF GitHub Bot commented on FLINK-8944:
---

Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188377434
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -70,20 +113,107 @@ public void 
testIsRecoverableExceptionWithNullErrorType() {
}
 
@Test
-   public void testCustomConfigurationOverride() {
-   Properties configProps = new Properties();
-   configProps.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
-   KinesisProxy proxy = new KinesisProxy(configProps) {
-   @Override
-   protected AmazonKinesis createKinesisClient(Properties 
configProps) {
-   ClientConfiguration clientConfig = new 
ClientConfigurationFactory().getConfig();
-   clientConfig.setSocketTimeout(1);
-   return AWSUtil.createKinesisClient(configProps, 
clientConfig);
+   public void testGetShardList() throws Exception {
+   List shardIds =
+   Arrays.asList(
+   "shardId-",
+   "shardId-0001",
+   "shardId-0002",
+   "shardId-0003");
+   shardIdSet = new HashSet<>(shardIds);
+   shards =
+   shardIds
+   .stream()
+   .map(shardId -> new 
Shard().withShardId(shardId))
+   .collect(Collectors.toList());
+   Properties kinesisConsumerConfig = new Properties();
+   
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+   
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"fake_accesskey");
+   kinesisConsumerConfig.setProperty(
+   ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"fake_secretkey");
+   KinesisProxy kinesisProxy = new 
KinesisProxy(kinesisConsumerConfig);
+   AmazonKinesis mockClient = mock(AmazonKinesis.class);
+   Whitebox.setInternalState(kinesisProxy, "kinesisClient", 
mockClient);
+
+   ListShardsResult responseWithMoreData =
+   new 
ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN);
+   ListShardsResult responseFinal =
+   new 
ListShardsResult().withShards(shards.subList(2, 
shards.size())).withNextToken(null);
+   doReturn(responseWithMoreData)
+   .when(mockClient)
+   
.listShards(argThat(initialListShardsRequestMatcher()));
+   
doReturn(responseFinal).when(mockClient).listShards(argThat(listShardsNextToken(NEXT_TOKEN)));
+   HashMap streamHashMap =
+   
createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName));
+   GetShardListResult shardListResult = 
kinesisProxy.getShardList(streamHashMap);
+
+   Assert.assertEquals(shardListResult.hasRetrievedShards(), true);
+
+   Set expectedStreams = new HashSet<>();
+   expectedStreams.add(fakeStreamName);
+   
Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), 
expectedStreams);
+   List actualShardList =
+   
shardListResult.getRetrievedShardListOfStream(fakeStreamName);
+   List expectedStreamShard = new ArrayList<>();
+   System.out.println(actualShardList.toString());
+   assertThat(actualShardList, hasSize(4));
+   for (int i = 0; i < 4; i++) {
+   StreamShardHandle shardHandle =
+   new StreamShardHandle(
+   fakeStreamName,
+   new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i)));
+   expectedStreamShard.add(shardHandle);
+   }
+
+   Assert.assertThat(
+   actualShardList,
+   containsInAnyOrder(
+   expectedStreamShard.toArray(ne

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188377393
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -382,50 +386,62 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
 * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
-   private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
-   final DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
-   describeStreamRequest.setStreamName(streamName);
-   describeStreamRequest.setExclusiveStartShardId(startShardId);
+   private ListShardsResult listShards(String streamName, @Nullable String 
startShardId,
+   

@Nullable String startNextToken)
+   throws InterruptedException {
+   final ListShardsRequest listShardsRequest = new 
ListShardsRequest();
+   if (startNextToken == null) {
+   
listShardsRequest.setExclusiveStartShardId(startShardId);
+   listShardsRequest.setStreamName(streamName);
+   } else {
+   // Note the nextToken returned by AWS expires within 
300 sec.
+   listShardsRequest.setNextToken(startNextToken);
+   }
 
-   DescribeStreamResult describeStreamResult = null;
+   ListShardsResult listShardsResults = null;
 
-   // Call DescribeStream, with full-jitter backoff (if we get 
LimitExceededException).
+   // Call ListShards, with full-jitter backoff (if we get 
LimitExceededException).
int attemptCount = 0;
-   while (describeStreamResult == null) { // retry until we get a 
result
+   // List Shards returns just the first 1000 shard entries. Make 
sure that all entries
+   // are taken up.
+   while (listShardsResults == null) { // retry until we get a 
result
try {
-   describeStreamResult = 
kinesisClient.describeStream(describeStreamRequest);
+   listShardsResults = 
kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = fullJitterBackoff(
-   describeStreamBaseBackoffMillis, 
describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
-   LOG.warn("Got LimitExceededException when 
describing stream " + streamName + ". Backing off for "
-   + backoffMillis + " millis.");
+   listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+   LOG.warn("Got LimitExceededException 
when listing shards from stream " + streamName
+   + ". 
Backing off for " + backoffMillis + " millis.");
Thread.sleep(backoffMillis);
-   } catch (ResourceNotFoundException re) {
-   throw new RuntimeException("Error while getting 
stream details", re);
-   }
-   }
-
-   String streamStatus = 
describeStreamResult.getStreamDescription().getStreamStatus();
-   if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || 
streamStatus.equals(StreamStatus.UPDATING.toString( {
-   if (LOG.isWarnEnabled()) {
-   LOG.warn("The status of stream " + streamName + 
" is " + streamStatus + "; result of the current " +
-   "describeStream operation will not 
contain any shard information.");
+   } catch (ResourceInUseException reInUse) {
+   if (LOG.isWarnEnabled()) {
+   // List Shards will throw an exception 
if stream in not in active state. Will return
+   LOG.warn("The stream is currently not 
in active state. Reusing the older state "
+   + "for the time being");
+   break;
+   }
+   } catch (ResourceNotFoundException

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188377434
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -70,20 +113,107 @@ public void 
testIsRecoverableExceptionWithNullErrorType() {
}
 
@Test
-   public void testCustomConfigurationOverride() {
-   Properties configProps = new Properties();
-   configProps.setProperty(AWSConfigConstants.AWS_REGION, 
"us-east-1");
-   KinesisProxy proxy = new KinesisProxy(configProps) {
-   @Override
-   protected AmazonKinesis createKinesisClient(Properties 
configProps) {
-   ClientConfiguration clientConfig = new 
ClientConfigurationFactory().getConfig();
-   clientConfig.setSocketTimeout(1);
-   return AWSUtil.createKinesisClient(configProps, 
clientConfig);
+   public void testGetShardList() throws Exception {
+   List shardIds =
+   Arrays.asList(
+   "shardId-",
+   "shardId-0001",
+   "shardId-0002",
+   "shardId-0003");
+   shardIdSet = new HashSet<>(shardIds);
+   shards =
+   shardIds
+   .stream()
+   .map(shardId -> new 
Shard().withShardId(shardId))
+   .collect(Collectors.toList());
+   Properties kinesisConsumerConfig = new Properties();
+   
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, 
"us-east-1");
+   
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, 
"fake_accesskey");
+   kinesisConsumerConfig.setProperty(
+   ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, 
"fake_secretkey");
+   KinesisProxy kinesisProxy = new 
KinesisProxy(kinesisConsumerConfig);
+   AmazonKinesis mockClient = mock(AmazonKinesis.class);
+   Whitebox.setInternalState(kinesisProxy, "kinesisClient", 
mockClient);
+
+   ListShardsResult responseWithMoreData =
+   new 
ListShardsResult().withShards(shards.subList(0, 2)).withNextToken(NEXT_TOKEN);
+   ListShardsResult responseFinal =
+   new 
ListShardsResult().withShards(shards.subList(2, 
shards.size())).withNextToken(null);
+   doReturn(responseWithMoreData)
+   .when(mockClient)
+   
.listShards(argThat(initialListShardsRequestMatcher()));
+   
doReturn(responseFinal).when(mockClient).listShards(argThat(listShardsNextToken(NEXT_TOKEN)));
+   HashMap streamHashMap =
+   
createInitialSubscribedStreamsToLastDiscoveredShardsState(Arrays.asList(fakeStreamName));
+   GetShardListResult shardListResult = 
kinesisProxy.getShardList(streamHashMap);
+
+   Assert.assertEquals(shardListResult.hasRetrievedShards(), true);
+
+   Set expectedStreams = new HashSet<>();
+   expectedStreams.add(fakeStreamName);
+   
Assert.assertEquals(shardListResult.getStreamsWithRetrievedShards(), 
expectedStreams);
+   List actualShardList =
+   
shardListResult.getRetrievedShardListOfStream(fakeStreamName);
+   List expectedStreamShard = new ArrayList<>();
+   System.out.println(actualShardList.toString());
+   assertThat(actualShardList, hasSize(4));
+   for (int i = 0; i < 4; i++) {
+   StreamShardHandle shardHandle =
+   new StreamShardHandle(
+   fakeStreamName,
+   new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i)));
+   expectedStreamShard.add(shardHandle);
+   }
+
+   Assert.assertThat(
+   actualShardList,
+   containsInAnyOrder(
+   expectedStreamShard.toArray(new 
StreamShardHandle[actualShardList.size()])));
+   }
+
+   private static class ListShardsRequestMatcher extends 
TypeSafeDiagnosingMatcher {
+   private final String shardId;
+   private final String nextToken;
+

[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188377363
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ---
@@ -65,14 +65,14 @@ public SentinelSequenceNumber 
toSentinelSequenceNumber() {
/** The date format of initial timestamp to start reading Kinesis 
stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = 
"flink.stream.initpos.timestamp.format";
 
-   /** The base backoff time between each describeStream attempt. */
-   public static final String STREAM_DESCRIBE_BACKOFF_BASE = 
"flink.stream.describe.backoff.base";
--- End diff --

For the time being, I will keep the older names and make the changes to 
deprecate the property names in the followup PR. I am not sure what the policy 
is with respect to changes size. I feel breaking this up to 2 different PRs 
will make it easier to review. Let me know if you feel otherwise. I will pull 
in those changes into this PR then. 


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188377415
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -353,19 +355,21 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
private List getShardsOfStream(String streamName, 
@Nullable String lastSeenShardId) throws InterruptedException {
List shardsOfStream = new ArrayList<>();
 
-   DescribeStreamResult describeStreamResult;
+   // List Shards returns just the first 1000 shard entries. In 
order to read the entire stream,
+   // we need to use the returned nextToken to get additional 
shards.
+   ListShardsResult listShardsResult;
+   String startShardToken = null;
do {
-   describeStreamResult = describeStream(streamName, 
lastSeenShardId);
-
-   List shards = 
describeStreamResult.getStreamDescription().getShards();
+   listShardsResult = listShards(streamName, 
lastSeenShardId, startShardToken);
--- End diff --

Thanks for catching this. This is something which I have fixed by clearing 
shardsOfStream to ensure we return an empty shardsOfStream in case of 
ExpiredTokenException.
I intended the following behavior for this: In case there is an unlikely 
case of expired next token, then we will just return an empty ShardsOfStream. 
This should be alright since in case there are no new shards discovered, by 
default it ends up returning an empty shardsOfStream. 


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-15 Thread kailashhd
Github user kailashhd commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r188377376
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ---
@@ -382,50 +386,62 @@ protected static boolean 
isRecoverableException(AmazonServiceException ex) {
 * @param startShardId which shard to start with for this describe 
operation (earlier shard's infos will not appear in result)
 * @return the result of the describe stream operation
 */
-   private DescribeStreamResult describeStream(String streamName, 
@Nullable String startShardId) throws InterruptedException {
-   final DescribeStreamRequest describeStreamRequest = new 
DescribeStreamRequest();
-   describeStreamRequest.setStreamName(streamName);
-   describeStreamRequest.setExclusiveStartShardId(startShardId);
+   private ListShardsResult listShards(String streamName, @Nullable String 
startShardId,
+   

@Nullable String startNextToken)
+   throws InterruptedException {
+   final ListShardsRequest listShardsRequest = new 
ListShardsRequest();
+   if (startNextToken == null) {
+   
listShardsRequest.setExclusiveStartShardId(startShardId);
+   listShardsRequest.setStreamName(streamName);
+   } else {
+   // Note the nextToken returned by AWS expires within 
300 sec.
+   listShardsRequest.setNextToken(startNextToken);
+   }
 
-   DescribeStreamResult describeStreamResult = null;
+   ListShardsResult listShardsResults = null;
 
-   // Call DescribeStream, with full-jitter backoff (if we get 
LimitExceededException).
+   // Call ListShards, with full-jitter backoff (if we get 
LimitExceededException).
int attemptCount = 0;
-   while (describeStreamResult == null) { // retry until we get a 
result
+   // List Shards returns just the first 1000 shard entries. Make 
sure that all entries
+   // are taken up.
+   while (listShardsResults == null) { // retry until we get a 
result
try {
-   describeStreamResult = 
kinesisClient.describeStream(describeStreamRequest);
+   listShardsResults = 
kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = fullJitterBackoff(
-   describeStreamBaseBackoffMillis, 
describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
-   LOG.warn("Got LimitExceededException when 
describing stream " + streamName + ". Backing off for "
-   + backoffMillis + " millis.");
+   listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+   LOG.warn("Got LimitExceededException 
when listing shards from stream " + streamName
+   + ". 
Backing off for " + backoffMillis + " millis.");
Thread.sleep(backoffMillis);
-   } catch (ResourceNotFoundException re) {
-   throw new RuntimeException("Error while getting 
stream details", re);
-   }
-   }
-
-   String streamStatus = 
describeStreamResult.getStreamDescription().getStreamStatus();
-   if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || 
streamStatus.equals(StreamStatus.UPDATING.toString( {
-   if (LOG.isWarnEnabled()) {
-   LOG.warn("The status of stream " + streamName + 
" is " + streamStatus + "; result of the current " +
-   "describeStream operation will not 
contain any shard information.");
+   } catch (ResourceInUseException reInUse) {
+   if (LOG.isWarnEnabled()) {
+   // List Shards will throw an exception 
if stream in not in active state. Will return
+   LOG.warn("The stream is currently not 
in active state. Reusing the older state "
+   + "for the time being");
+   break;
+   }
+   } catch (ResourceNotFoundException

[jira] [Closed] (FLINK-9341) Oracle: "Type is not supported: Date"

2018-05-15 Thread Ken Geis (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ken Geis closed FLINK-9341.
---
Resolution: Not A Bug

This issue happened because because I used BasicTypeInfo.DATE_TYPE_INFO in my 
input format instead of SqlTimeTypeInfo.TIMESTAMP. I changed that and the error 
went away.

I agree with [~Sergey Nuyanzin]'s suggestion to include the full class name in 
the exception.

> Oracle: "Type is not supported: Date"
> -
>
> Key: FLINK-9341
> URL: https://issues.apache.org/jira/browse/FLINK-9341
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.2
>Reporter: Ken Geis
>Priority: Major
>
> When creating a Table from an Oracle JDBCInputFormat with a date column, I 
> get the error "Type is not supported: Date". This happens with as simple a 
> query as
> {code:java}
> SELECT SYSDATE FROM DUAL{code}
>  Stack trace:
> {noformat}
> Caused by: org.apache.flink.table.api.TableException: Type is not supported: 
> Date
>     at 
> org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) 
> ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:336)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:68)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildLogicalRowType$1.apply(FlinkTypeFactory.scala:198)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildLogicalRowType$1.apply(FlinkTypeFactory.scala:195)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  ~[scala-library-2.11.11.jar:na]
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
> ~[scala-library-2.11.11.jar:na]
>     at 
> org.apache.flink.table.calcite.FlinkTypeFactory.buildLogicalRowType(FlinkTypeFactory.scala:195)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.plan.schema.InlineTable.getRowType(InlineTable.scala:105)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:499)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:485) 
> ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.api.java.BatchTableEnvironment.fromDataSet(BatchTableEnvironment.scala:61)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.api.java.BatchTableEnvironment$fromDataSet$0.call(Unknown
>  Source) ~[na:na]
> (at my code...)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9362) Document that Flink doesn't guarantee transaction of modifying state and register timers in processElement()

2018-05-15 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-9362.
---
Resolution: Fixed

> Document that Flink doesn't guarantee transaction of modifying state and 
> register timers in processElement()
> 
>
> Key: FLINK-9362
> URL: https://issues.apache.org/jira/browse/FLINK-9362
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9362) Document that Flink doesn't guarantee transaction of modifying state and register timers in processElement()

2018-05-15 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476191#comment-16476191
 ] 

Bowen Li commented on FLINK-9362:
-

Make sense. It's not a traditional 'transactional guarantee', while it can 
provide some consistency in checkpoint. Closing

> Document that Flink doesn't guarantee transaction of modifying state and 
> register timers in processElement()
> 
>
> Key: FLINK-9362
> URL: https://issues.apache.org/jira/browse/FLINK-9362
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3599) GSoC: Code Generation in Serializers

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476180#comment-16476180
 ] 

ASF GitHub Bot commented on FLINK-3599:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2211
  
Hi,

There were a number of bugs and painful parts in the code, and 
unfortunately I'm not really sure whether they are major blockers or easily 
fixable. It surely wouldn't be an 1-2 day work, but some weeks might be 
realistic.

As an example from just off the top of my head, we choke in situations 
where the POJO has a generic argument T, and a field of type T, and we tell 
Flink with a `.returns` about the generic argument.

There were some other random test failures in older tests, which would 
require some debugging.

I think we also have the problem that we don't correctly clean up the old 
generated classes, so they cause a memory leak across Flink jobs.

Another potential issue is that since this PR, there was a new mechanism 
added to the Flink serializers for dealing with compatibility of serializers 
(e.g., to ensure that savepoints work across Flink versions), and I'm not sure 
whether it would be straightforward to update this PR for this. (For example, 
`PojoSerializer.ensureCompatibility` did not exist back then, and it looks a 
bit scary at first glance.)

One of the pain points was shipping generated code from the driver program 
to the TMs. The problem is that if you simply serialize an instance of a 
generated class in the driver (for shipping the job to the cluster), the TM 
cannot just simply deserialize it, since the generated class does not exist in 
that JVM. The good news here is that since this PR I accidentally stumbled upon 
a new solution to this (using `writeReplace` and `readResolve`), which would be 
nicer than the current solution of this PR.

Note that there is also this PR for similar code generation for the sorters:
https://github.com/apache/flink/pull/3511
This has a lot of similarities, but less pain points, because the sorters 
are better isolated from the other parts of Flink than the serializers, and 
also because there we don't have to ship generated code from the driver to the 
TMs (because the sorters are created on the TMs). So I think if we wanted to 
push this serializer codegen PR, then first we should start with the sorter 
codegen PR instead, because it is the easier one of the two. I think that one 
is actually quite close to being mergable. (Note, that I think we solved the 
"cleanup between Flink jobs" issue in that PR, and that solution could be 
adapted to this PR.)

Btw. we are in the planning phase of an MSc thesis with (@mukrram-bajwa), 
whose work might subsume both of these PRs. The idea is to use Graal to do many 
of the specializations that are in these PRs. The nice thing would be that the 
modifications to Flink's code would be much less compared to these PRs, as the 
magic would happen mostly inside a Graal compilation plugin as custom 
optimization passes.

Best,
Gábor


> GSoC: Code Generation in Serializers
> 
>
> Key: FLINK-3599
> URL: https://issues.apache.org/jira/browse/FLINK-3599
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Márton Balassi
>Assignee: Gabor Horvath
>Priority: Major
>  Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7814) Add BETWEEN expression to Table API

2018-05-15 Thread Sergey Nuyanzin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin updated FLINK-7814:
---
Description: * The Table API does not have a BETWEEN expression. BETWEEN is 
quite handy when defining join predicates for window joins.  (was: The Table 
API does not have a BETWEEN expression. BETWEEN is quite handy when defining 
join predicates for window joins.)

> Add BETWEEN expression to Table API 
> 
>
> Key: FLINK-7814
> URL: https://issues.apache.org/jira/browse/FLINK-7814
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Priority: Minor
>
> * The Table API does not have a BETWEEN expression. BETWEEN is quite handy 
> when defining join predicates for window joins.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #2211: [WIP][FLINK-3599] Code generation for PojoSerializer and ...

2018-05-15 Thread ggevay
Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/2211
  
Hi,

There were a number of bugs and painful parts in the code, and 
unfortunately I'm not really sure whether they are major blockers or easily 
fixable. It surely wouldn't be an 1-2 day work, but some weeks might be 
realistic.

As an example from just off the top of my head, we choke in situations 
where the POJO has a generic argument T, and a field of type T, and we tell 
Flink with a `.returns` about the generic argument.

There were some other random test failures in older tests, which would 
require some debugging.

I think we also have the problem that we don't correctly clean up the old 
generated classes, so they cause a memory leak across Flink jobs.

Another potential issue is that since this PR, there was a new mechanism 
added to the Flink serializers for dealing with compatibility of serializers 
(e.g., to ensure that savepoints work across Flink versions), and I'm not sure 
whether it would be straightforward to update this PR for this. (For example, 
`PojoSerializer.ensureCompatibility` did not exist back then, and it looks a 
bit scary at first glance.)

One of the pain points was shipping generated code from the driver program 
to the TMs. The problem is that if you simply serialize an instance of a 
generated class in the driver (for shipping the job to the cluster), the TM 
cannot just simply deserialize it, since the generated class does not exist in 
that JVM. The good news here is that since this PR I accidentally stumbled upon 
a new solution to this (using `writeReplace` and `readResolve`), which would be 
nicer than the current solution of this PR.

Note that there is also this PR for similar code generation for the sorters:
https://github.com/apache/flink/pull/3511
This has a lot of similarities, but less pain points, because the sorters 
are better isolated from the other parts of Flink than the serializers, and 
also because there we don't have to ship generated code from the driver to the 
TMs (because the sorters are created on the TMs). So I think if we wanted to 
push this serializer codegen PR, then first we should start with the sorter 
codegen PR instead, because it is the easier one of the two. I think that one 
is actually quite close to being mergable. (Note, that I think we solved the 
"cleanup between Flink jobs" issue in that PR, and that solution could be 
adapted to this PR.)

Btw. we are in the planning phase of an MSc thesis with (@mukrram-bajwa), 
whose work might subsume both of these PRs. The idea is to use Graal to do many 
of the specializations that are in these PRs. The nice thing would be that the 
modifications to Flink's code would be much less compared to these PRs, as the 
magic would happen mostly inside a Graal compilation plugin as custom 
optimization passes.

Best,
Gábor


---


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476118#comment-16476118
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188321914
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param  type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema
+   extends RegistryAvroDeserializationSchema {
+
+   private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider provider for schema coder that reads 
writer schema from Confluent Schema Registry
+*/
+   private ConfluentRegistryAvroDeserializationSchema(
+   Class recordClazz,
+   @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader, schemaCoderProvider);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema schema of produced records
+* @param urlurl of schema registry to connect
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(
+   Schema schema, String url) {
+   return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+   }
+
+   /**
+* Creates {@link ConfluentRegistryAvroDeserializationSchema} that 
produces {@link GenericRecord}
+* using provided reader schema and looks up writer schema in Confluent 
Schema Registry.
+*
+* @param schema  schema of produced records
+* @param url url of schema registry to connect
+* @param identityMapCapacity maximum number of cached schema versions 
(default: 1000)
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static ConfluentRegistryAvroDeserializationSchema 
forGeneric(
+   Schema schema, String url, int identityMapCapacity) {
+   return new ConfluentRegistryAvroDeserializationSchema<>(
+   GenericRecord.class,
+   schema,
+   new CachedSchemaCoderProvider(url, 
identityMapCapacity));
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces classes that 
were generated from avro
+* schema and looks u

[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476133#comment-16476133
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188350196
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder}.
+ *
+ * @param  type of record it produces
+ */
+public class RegistryAvroDeserializationSchema extends 
AvroDeserializationSchema {
+   private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
+   private transient SchemaCoder schemaCoder;
+
+   /**
+* Creates Avro deserialization schema that reads schema from input 
stream using provided {@link SchemaCoder}.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider schema provider that allows instantiation 
of {@link SchemaCoder} that will be used for
+*schema reading
+*/
+   protected RegistryAvroDeserializationSchema(
+   Class recordClazz,
+   @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader);
+   this.schemaCoderProvider = schemaCoderProvider;
+   this.schemaCoder = schemaCoderProvider.get();
+   }
+
+   @Override
+   public T deserialize(byte[] message) {
+   // read record
+   try {
+   getInputStream().setBuffer(message);
+   Schema writerSchema = 
schemaCoder.readSchema(getInputStream());
+   Schema readerSchema = getReaderSchema();
+
+   GenericDatumReader datumReader = getDatumReader();
+
+   datumReader.setSchema(writerSchema);
+   datumReader.setExpected(readerSchema);
+
+   return datumReader.read(null, getDecoder());
+   } catch (Exception e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
+   }
+   }
+
+   private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
--- End diff --

See above, would suggest to avoid readObject()


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476124#comment-16476124
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188328437
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
--- End diff --

Double Apache header


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476137#comment-16476137
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188349853
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder}.
+ *
+ * @param  type of record it produces
+ */
+public class RegistryAvroDeserializationSchema extends 
AvroDeserializationSchema {
+   private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
+   private transient SchemaCoder schemaCoder;
+
+   /**
+* Creates Avro deserialization schema that reads schema from input 
stream using provided {@link SchemaCoder}.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider schema provider that allows instantiation 
of {@link SchemaCoder} that will be used for
+*schema reading
+*/
+   protected RegistryAvroDeserializationSchema(
+   Class recordClazz,
+   @Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader);
--- End diff --

Empty line / indentation


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476127#comment-16476127
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188347289
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
--- End diff --

This class uses a mixture of eager initialization of transient members (in 
readObject()) and lazy initialization (in getDatumReader()).

I would suggest to do it all one way or the other.

My suggestion would be to avoid `readObject()` whenever possible. If you 
encounter an exception during schema parsing (and it may be something weird 
from Jackson, like a missing manifest due to a shading issue), you will get the 
most unhelpful exception stack trace ever, in the weirdest place (like Flink's 
RPC message decoder).

In my experience, when a user sees such a stack trace, they are rarely able 
to diagnose that. Best case they show up on the mailing list, worst case they 
give up.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476126#comment-16476126
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188348636
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
--- End diff --

That would mean initializing it all lazily, in `getDatumReader()` or in a 
`checkAvroInitialized()` method.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476135#comment-16476135
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188355390
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -99,9 +108,29 @@
 
/**
 * Creates a new AvroSerializer for the type indicated by the given 
class.
+* This constructor is intended to be used with {@link SpecificRecord} 
or reflection serializer.
+* For serializing {@link GenericData.Record} use {@link 
AvroSerializer#AvroSerializer(Class, Schema)}
 */
public AvroSerializer(Class type) {
+   Preconditions.checkArgument(!isGenericRecord(type),
--- End diff --

Minor: Other preconditions checks in this class are done by statically 
imported methods. While this is not consistent within the code base, I would 
suggest to keep this consistent within a class as much as possible.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476129#comment-16476129
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188338897
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
--- End diff --

You can make this variable final as well.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476130#comment-16476130
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188328926
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
--- End diff --

I would avoid null initialization, it is redundant. It actually does 
nothing (fields are null anyways) but acually exists as byte code, hence costs 
cpu cycles.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476120#comment-16476120
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188325819
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import org.apache.avro.Schema;
+
+import java.io.DataInputStream;
+import java.io.InputStream;
+
+/**
+ * Reads schema using Confluent Schema Registry protocol.
+ */
+public class ConfluentSchemaRegistryCoder implements SchemaCoder {
+
+   private final SchemaRegistryClient schemaRegistryClient;
+
+   /**
+* Creates {@link SchemaCoder} that uses provided {@link 
SchemaRegistryClient} to connect to
+* schema registry.
+*
+* @param schemaRegistryClient client to connect schema registry
+*/
+   public ConfluentSchemaRegistryCoder(SchemaRegistryClient 
schemaRegistryClient) {
+   this.schemaRegistryClient = schemaRegistryClient;
+   }
+
+   @Override
+   public Schema readSchema(InputStream in) throws Exception {
+   DataInputStream dataInputStream = new DataInputStream(in);
+
+   if (dataInputStream.readByte() != 0) {
+   throw new RuntimeException("Unknown data format. Magic 
number does not match");
--- End diff --

RuntimeExceptions (unchecked exceptions) are usually used to indicate 
programming errors, or (as a workaround) if the scope does not allow throwing 
any exception.

This here is a case for a checked exception, in my opinion, like an 
`IOException`, `FlinkException`, etc.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476136#comment-16476136
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188353074
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -79,15 +87,16 @@
 
//  runtime fields, non-serializable, lazily initialized 
---
 
-   private transient SpecificDatumWriter writer;
-   private transient SpecificDatumReader reader;
+   private transient GenericDatumWriter writer;
+   private transient GenericDatumReader reader;
 
private transient DataOutputEncoder encoder;
private transient DataInputDecoder decoder;
 
-   private transient SpecificData avroData;
+   private transient GenericData avroData;
 
private transient Schema schema;
+   private final String schemaString;
--- End diff --

As per the comments, the existing code orders config fields before runtime 
fields. Can you place the schema to match that pattern?


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476121#comment-16476121
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188337378
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
--- End diff --

This should have a serial version UID.
You can activate the respective inspections in IntelliJ to warn about such 
issues.


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   >