[jira] [Assigned] (FLINK-10463) Null literal cannot be properly parsed in Java Table API function call

2018-10-19 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng reassigned FLINK-10463:
---

Assignee: Hequn Cheng

> Null literal cannot be properly parsed in Java Table API function call
> --
>
> Key: FLINK-10463
> URL: https://issues.apache.org/jira/browse/FLINK-10463
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Xingcan Cui
>Assignee: Hequn Cheng
>Priority: Major
>
> For example, the expression `Null(STRING).regexpReplace('oo|ar', '')` throws 
> the following exception.
> {code:java}
> org.apache.flink.table.api.ExpressionParserException: Could not parse 
> expression at column 13: string matching regex 
> `(?i)\Qas\E(?![_$\p{javaJavaIdentifierPart}])' expected but `.' found
> Null(STRING).regexpReplace('oo|ar', '')
> ^
>   at 
> org.apache.flink.table.expressions.ExpressionParser$.throwError(ExpressionParser.scala:576)
>   at 
> org.apache.flink.table.expressions.ExpressionParser$.parseExpression(ExpressionParser.scala:569)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657704#comment-16657704
 ] 

ASF GitHub Bot commented on FLINK-10205:


TisonKun commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-431546174
 
 
   FYI there was another 
[discussion(FLINK-10038)](https://issues.apache.org/jira/browse/FLINK-10038) on 
`InputSplit` (creation and) assignment.
   I prefer the idea to move the whole `InputSplit` assignment into a single 
task. It could, get splits auto checkpointed and simplify the currently complex 
JM. However, new nodes might introduce unexpected feature to the graph, and 
besides we have to implement some "pull" semantic for splits nodes.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Job: InputSplit Fault tolerant for DataSourceTask
> ---
>
> Key: FLINK-10205
> URL: https://issues.apache.org/jira/browse/FLINK-10205
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: 1.6.1, 1.6.2, 1.7.0
>Reporter: JIN SUN
>Assignee: JIN SUN
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> Today DataSource Task pull InputSplits from JobManager to achieve better 
> performance, however, when a DataSourceTask failed and rerun, it will not get 
> the same splits as its previous version. this will introduce inconsistent 
> result or even data corruption.
> Furthermore,  if there are two executions run at the same time (in batch 
> scenario), this two executions should process same splits.
> we need to fix the issue to make the inputs of a DataSourceTask 
> deterministic. The propose is save all splits into ExecutionVertex and 
> DataSourceTask will pull split from there.
>  document:
> [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…

2018-10-19 Thread GitBox
TisonKun commented on issue #6684: [FLINK-10205] Batch Job: InputSplit 
Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6684#issuecomment-431546174
 
 
   FYI there was another 
[discussion(FLINK-10038)](https://issues.apache.org/jira/browse/FLINK-10038) on 
`InputSplit` (creation and) assignment.
   I prefer the idea to move the whole `InputSplit` assignment into a single 
task. It could, get splits auto checkpointed and simplify the currently complex 
JM. However, new nodes might introduce unexpected feature to the graph, and 
besides we have to implement some "pull" semantic for splits nodes.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10558) Port YARNHighAvailabilityITCase and YARNSessionCapacitySchedulerITCase to new code base

2018-10-19 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657694#comment-16657694
 ] 

TisonKun commented on FLINK-10558:
--

[~till.rohrmann] I'd like to know if the present status excepted. As [~yanghua] 
commented on GitHub.

bq. @tillrohrmann For test case testTaskManagerFailure, the current test mode 
(based on stdout and stderr's special print message) cannot verify the Flip-6 
scene. Because it doesn't pre-start a TM, so I can only commit a job to drive 
the TM registration, but this will cause the Job's output to flush out the 
flink's output, which shares stdout and stderr. Therefore, many judgments will 
not take effect.

I wonder if on FLIP-6 yarn session mode, we'd like to (really) pre-start TMs, 
if so, we have to mark that the workaround(submit a job to drive th TM 
registration) is temporary.

> Port YARNHighAvailabilityITCase and YARNSessionCapacitySchedulerITCase to new 
> code base
> ---
>
> Key: FLINK-10558
> URL: https://issues.apache.org/jira/browse/FLINK-10558
> Project: Flink
>  Issue Type: Sub-task
>Reporter: vinoyang
>Assignee: TisonKun
>Priority: Minor
>
> {{YARNHighAvailabilityITCase}}, 
> {{YARNSessionCapacitySchedulerITCase#testClientStartup,}} 
> {{YARNSessionCapacitySchedulerITCase#testTaskManagerFailure}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657653#comment-16657653
 ] 

ASF GitHub Bot commented on FLINK-10461:


klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-431539541
 
 
   Hi, @azagrebin , could you please help to review this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Speed up download file procedure when restore 
> --
>
> Key: FLINK-10461
> URL: https://issues.apache.org/jira/browse/FLINK-10461
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>  Labels: pull-request-available
>
> In the current master branch, the restore will download file from DFS, the 
> download procedure are single-thread, this could speed up by using 
> multi-thread for downloading states from DFS.
>  
> In my company, the states will come to some terabytes, so the restore 
> procedure will become a litter slow, after a bit digging, I find download 
> states from DFS using single thread, this could using multi-thread for speed 
> up.
> I test the time used for download states from DFS with ~2 terabytes states. 
> With single thread it used 640+s, and 130+s when using 5 threads for download.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-10-19 Thread GitBox
klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-431539541
 
 
   Hi, @azagrebin , could you please help to review this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase

2018-10-19 Thread GitBox
yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in 
YarnTestBase
URL: https://github.com/apache/flink/pull/6816#issuecomment-431537500
 
 
   failed case : 
testFlinkKafkaProducer10FailTransactionCoordinatorBeforeNotify, log file : 
https://api.travis-ci.org/v3/job/443777257/log.txt


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657639#comment-16657639
 ] 

ASF GitHub Bot commented on FLINK-10527:


yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in 
YarnTestBase
URL: https://github.com/apache/flink/pull/6816#issuecomment-431537500
 
 
   failed case : 
testFlinkKafkaProducer10FailTransactionCoordinatorBeforeNotify, log file : 
https://api.travis-ci.org/v3/job/443777257/log.txt


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cleanup constant isNewMode in YarnTestBase
> --
>
> Key: FLINK-10527
> URL: https://issues.apache.org/jira/browse/FLINK-10527
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> This seems to be a residual problem with FLINK-10396. It is set to true in 
> that PR. Currently it has three usage scenarios:
> 1. assert, caused an error
> {code:java}
> assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
> {code}
> 2. if (!isNewMode) the logic in the block would not have invoked, the if 
> block can be removed
> 3. if (isNewMode) always been invoked, the if statement can be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10619) config MemoryStateBackend default value

2018-10-19 Thread yangxiaoshuo (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657627#comment-16657627
 ] 

yangxiaoshuo commented on FLINK-10619:
--

If it is allowed, I would linke to fix it.

> config MemoryStateBackend default value
> ---
>
> Key: FLINK-10619
> URL: https://issues.apache.org/jira/browse/FLINK-10619
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Reporter: yangxiaoshuo
>Assignee: yangxiaoshuo
>Priority: Minor
>  Labels: starter
>
> The default MAX_STATE_SIZE of MemoryStateBackend is 5 * 1024 * 1024.
> If we want to change it, the only way is 
> {code:java}
> // code placeholder
> env.setStateBackend(new MemoryStateBackend(1024 * 1024 * 1024));{code}
> Is it?
> So shall we add it into configuration file?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10619) config MemoryStateBackend default value

2018-10-19 Thread yangxiaoshuo (JIRA)
yangxiaoshuo created FLINK-10619:


 Summary: config MemoryStateBackend default value
 Key: FLINK-10619
 URL: https://issues.apache.org/jira/browse/FLINK-10619
 Project: Flink
  Issue Type: Improvement
  Components: Configuration
Reporter: yangxiaoshuo
Assignee: yangxiaoshuo


The default MAX_STATE_SIZE of MemoryStateBackend is 5 * 1024 * 1024.

If we want to change it, the only way is 
{code:java}
// code placeholder
env.setStateBackend(new MemoryStateBackend(1024 * 1024 * 1024));{code}
Is it?
So shall we add it into configuration file?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10618) Introduce catalog for Flink tables

2018-10-19 Thread Xuefu Zhang (JIRA)
Xuefu Zhang created FLINK-10618:
---

 Summary: Introduce catalog for Flink tables
 Key: FLINK-10618
 URL: https://issues.apache.org/jira/browse/FLINK-10618
 Project: Flink
  Issue Type: New Feature
  Components: SQL Client
Affects Versions: 1.6.1
Reporter: Xuefu Zhang
Assignee: Xuefu Zhang


Besides meta objects such as tables that may come from an {{ExternalCatalog}}, 
Flink also deals with tables/views/functions that are created on the fly (in 
memory), or specified in a configuration file. Those objects don't belong to 
any {{ExternalCatalog}}, yet Flink either stores them in memory, which are 
non-persistent, or recreates them from a file, which is a big pain for the 
user. Those objects are only known to Flink but Flink has a poor management for 
them.

Since they are typical objects in a database catalog, it's natural to have a 
catalog that manages those objects. The interface will be similar to 
{{ExternalCatalog}}, which contains meta objects that are not managed by Flink. 
There are several possible implementations of the Flink internal catalog 
interface: memory, file, external registry (such as confluent schema registry 
or Hive metastore), and relational database, etc. 

The initial functionality as well as the catalog hierarchy could be very 
simple. The basic functionality of the catalog will be mostly create, alter, 
and drop tables, views, functions, etc. Obviously, this can evolve over the 
time.

We plan to provide implementations with memory, file, and Hive metastore, and 
will be plugged in at SQL-Client layer.

Please provide your feedback.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10617) Restoring job fails because of slot allocation timeout

2018-10-19 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10617:
--

 Summary: Restoring job fails because of slot allocation timeout
 Key: FLINK-10617
 URL: https://issues.apache.org/jira/browse/FLINK-10617
 Project: Flink
  Issue Type: Bug
  Components: ResourceManager, TaskManager
Affects Versions: 1.6.1
Reporter: Elias Levy


The following may be related to FLINK-9932, but I am unsure.  If you believe it 
is, go ahead and close this issue and a duplicate.

While trying to test local state recovery on a job with large state, the job 
failed to be restored because slot allocation timed out.

The job is running on a standalone cluster with 12 nodes and 96 task slots (8 
per node).  The job has parallelism of 96, so it consumes all of the slots, and 
has ~200 GB of state in RocksDB.  

To test local state recovery I decided to kill one of the TMs.  The TM 
immediately restarted and re-registered with the JM.  I confirmed the JM showed 
96 registered task slots.
{noformat}
21:35:44,616 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Resolved ResourceManager address, beginning registration
21:35:44,616 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Registration at ResourceManager attempt 1 (timeout=100ms)
21:35:44,640 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Successful registration at resource manager 
akka.tcp://flink@172.31.18.172:6123/user/resourcemanager under registration id 
302988dea6afbd613bb2f96429b65d18.
21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Receive slot request AllocationID{4274d96a59d370305520876f5b84fb9f} for job 
87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
8e06aa64d5f8961809da38fe7f224cc1.
21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Allocated slot for AllocationID{4274d96a59d370305520876f5b84fb9f}.
21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
21:36:49,668 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService 
/leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
21:36:49,671 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Try to register at job manager 
akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id 
f85f6f9b-7713-4be3-a8f0-8443d91e5e6d.
21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Receive slot request AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9} for job 
87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
8e06aa64d5f8961809da38fe7f224cc1.
21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Allocated slot for AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9}.
21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
21:36:49,681 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService 
/leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
21:36:49,681 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService 
/leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
21:36:49,683 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Try to register at job manager 
akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id 
f85f6f9b-7713-4be3-a8f0-8443d91e5e6d.
21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Resolved JobManager address, beginning registration
21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Resolved JobManager address, beginning registration
21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Receive slot request AllocationID{740caf20a5f7f767864122dc9a7444d9} for job 
87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
8e06aa64d5f8961809da38fe7f224cc1.
21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Registration at JobManager attempt 1 (timeout=100ms)
21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Allocated slot for AllocationID{740caf20a5f7f767864122dc9a7444d9}.
21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
21:36:49,688 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService 
/leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
21:36:49,688 INFO  
org.apache.flink.runtim

[jira] [Commented] (FLINK-9761) Potential buffer leak in PartitionRequestClientHandler during job failures

2018-10-19 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657306#comment-16657306
 ] 

Till Rohrmann commented on FLINK-9761:
--

[~NicoK] could you verify whether this is really a problem?

> Potential buffer leak in PartitionRequestClientHandler during job failures
> --
>
> Key: FLINK-9761
> URL: https://issues.apache.org/jira/browse/FLINK-9761
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Critical
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> {{PartitionRequestClientHandler#stagedMessages}} may be accessed from 
> multiple threads:
> 1) Netty's IO thread
> 2) During cancellation, 
> {{PartitionRequestClientHandler.BufferListenerTask#notifyBufferDestroyed}} is 
> called
> If {{PartitionRequestClientHandler.BufferListenerTask#notifyBufferDestroyed}} 
> thinks, {{stagesMessages}} is empty, however, it will not install the 
> {{stagedMessagesHandler}} that consumes and releases buffers from received 
> messages.
> Unless some unexpected combination of code calls prevents this from 
> happening, this would leak the non-recycled buffers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8418) Kafka08ITCase.testStartFromLatestOffsets() times out on Travis

2018-10-19 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657305#comment-16657305
 ] 

Till Rohrmann commented on FLINK-8418:
--

Do you think that we can close this issue because it did not reappear again 
[~tzulitai]?

> Kafka08ITCase.testStartFromLatestOffsets() times out on Travis
> --
>
> Key: FLINK-8418
> URL: https://issues.apache.org/jira/browse/FLINK-8418
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> Instance: https://travis-ci.org/kl0u/flink/builds/327733085



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8073) Test instability FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8073.
--
   Resolution: Cannot Reproduce
Fix Version/s: (was: 1.5.6)
   (was: 1.6.3)
   (was: 1.7.0)

Problem did not reappear for quite some time. Closing. If this issue reappears, 
then we should reopen this issue.

> Test instability 
> FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()
> -
>
> Key: FLINK-8073
> URL: https://issues.apache.org/jira/browse/FLINK-8073
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/301985988



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8452) BucketingSinkTest.testNonRollingAvroKeyValueWithCompressionWriter instable on Travis

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8452.
--
   Resolution: Cannot Reproduce
Fix Version/s: (was: 1.5.6)
   (was: 1.6.3)
   (was: 1.7.0)

Problem did not reappear for quite some time.

> BucketingSinkTest.testNonRollingAvroKeyValueWithCompressionWriter instable on 
> Travis
> 
>
> Key: FLINK-8452
> URL: https://issues.apache.org/jira/browse/FLINK-8452
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The {{BucketingSinkTest.testNonRollingAvroKeyValueWithCompressionWriter}} 
> seems to be instable on Travis:
>  
> https://travis-ci.org/tillrohrmann/flink/jobs/330261310



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8408) YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3n unstable on Travis

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8408.
--
   Resolution: Cannot Reproduce
Fix Version/s: (was: 1.5.6)
   (was: 1.6.3)
   (was: 1.7.0)

Closing because the problem did not occur for quite some time.

> YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3n unstable on Travis
> --
>
> Key: FLINK-8408
> URL: https://issues.apache.org/jira/browse/FLINK-8408
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3n}} is unstable 
> on Travis.
> https://travis-ci.org/tillrohrmann/flink/jobs/327216460



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8779) ClassLoaderITCase.testKMeansJobWithCustomClassLoader fails on Travis

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8779.
--
   Resolution: Cannot Reproduce
Fix Version/s: (was: 1.5.6)
   (was: 1.6.3)

The test case has been ported to the Flip-6 code base. If the problem should 
reoccur, then we should reopen this issue.

> ClassLoaderITCase.testKMeansJobWithCustomClassLoader fails on Travis
> 
>
> Key: FLINK-8779
> URL: https://issues.apache.org/jira/browse/FLINK-8779
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The \{{ClassLoaderITCase.testKMeansJobWithCustomClassLoader}} fails on Travis 
> by producing not output for 300s. This might indicate a test instability or a 
> problem with Flink which was recently introduced.
> https://api.travis-ci.org/v3/job/344427688/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8034) ProcessFailureCancelingITCase.testCancelingOnProcessFailure failing on Travis

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8034.
--
   Resolution: Not A Problem
Fix Version/s: (was: 1.5.6)
   (was: 1.6.3)
   (was: 1.7.0)

The test case has been ported to the flip-6 code base. If the problem should 
reoccur, we can reopen this issue.

> ProcessFailureCancelingITCase.testCancelingOnProcessFailure failing on Travis
> -
>
> Key: FLINK-8034
> URL: https://issues.apache.org/jira/browse/FLINK-8034
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> The {{ProcessFailureCancelingITCase.testCancelingOnProcessFailure}} is 
> failing on Travis spuriously.
> https://travis-ci.org/tillrohrmann/flink/jobs/299075703



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8779) ClassLoaderITCase.testKMeansJobWithCustomClassLoader fails on Travis

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8779:
-
Fix Version/s: (was: 1.7.0)

> ClassLoaderITCase.testKMeansJobWithCustomClassLoader fails on Travis
> 
>
> Key: FLINK-8779
> URL: https://issues.apache.org/jira/browse/FLINK-8779
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.6, 1.6.3
>
>
> The \{{ClassLoaderITCase.testKMeansJobWithCustomClassLoader}} fails on Travis 
> by producing not output for 300s. This might indicate a test instability or a 
> problem with Flink which was recently introduced.
> https://api.travis-ci.org/v3/job/344427688/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7351) test instability in JobClientActorRecoveryITCase#testJobClientRecovery

2018-10-19 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657237#comment-16657237
 ] 

Till Rohrmann commented on FLINK-7351:
--

No longer relevant for 1.7 since we disabled the legacy mode in this version.

> test instability in JobClientActorRecoveryITCase#testJobClientRecovery
> --
>
> Key: FLINK-7351
> URL: https://issues.apache.org/jira/browse/FLINK-7351
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, Tests
>Affects Versions: 1.3.2, 1.4.0
>Reporter: Nico Kruber
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.6, 1.6.3
>
>
> On a 16-core VM, the following test failed during {{mvn clean verify}}
> {code}
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 22.814 sec 
> <<< FAILURE! - in org.apache.flink.runtime.client.JobClientActorRecoveryITCase
> testJobClientRecovery(org.apache.flink.runtime.client.JobClientActorRecoveryITCase)
>   Time elapsed: 21.299 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Not enough free slots available to run the job. You can decrease the operator 
> parallelism or increase the number of slots per TaskManager in the 
> configuration. Resources available to scheduler: Number of instances=0, total 
> number of slots=0, available slots=0
> at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:334)
> at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:139)
> at 
> org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:368)
> at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:309)
> at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596)
> at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:450)
> at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleLazy(ExecutionGraph.java:834)
> at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:814)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1425)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
Th

[jira] [Resolved] (FLINK-9764) Failure in LocalRecoveryRocksDBFullITCase

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9764.
--
Resolution: Cannot Reproduce

So far this issue has never been reported again and could not be reproduced. 
Thus, I would close it for the moment. In case that it reoccurs, please reopen 
this issue.

> Failure in LocalRecoveryRocksDBFullITCase
> -
>
> Key: FLINK-9764
> URL: https://issues.apache.org/jira/browse/FLINK-9764
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Streaming, Tests
>Affects Versions: 1.6.0
>Reporter: Nico Kruber
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.6.3, 1.7.0
>
>
> {code}
> Running org.apache.flink.test.checkpointing.LocalRecoveryRocksDBFullITCase
> Starting null#executeTest.
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.AssertionError: Window start: 0 end: 100 expected:<4950> but 
> was:<1209>
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
>   at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79)
>   at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:35)
>   at 
> org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.testTumblingTimeWindow(AbstractEventTimeWindowCheckpointingITCase.java:286)
>   at 
> org.apache.flink.test.checkpointing.AbstractLocalRecoveryITCase.executeTest(AbstractLocalRecoveryITCase.java:82)
>   at 
> org.apache.flink.test.checkpointing.AbstractLocalRecoveryITCase.executeTest(AbstractLocalRecoveryITCase.java:74)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> Caused by: java.lang.AssertionError: Window start: 0 end: 100 expected:<4950> 
> but was:<1209>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at 
> org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase$ValidatingSink.invoke(AbstractEventTimeWindowCheckpointingITCase.java:733)
>   at 
> org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase$ValidatingSink.invoke(AbstractEventTimeWindowCheckpointingITCase.java:669)
>   at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:5

[jira] [Resolved] (FLINK-10614) Update test_batch_allround.sh e2e to new testing infrastructure

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-10614.
---
Resolution: Fixed

Fixed via
1.7.0: 
https://github.com/apache/flink/commit/0fd7bd4f560f61077f72638a69a9102e33841365
1.6.3: 
https://github.com/apache/flink/commit/c0c9f39504a207d60d43cd7eea206acffe2cf4da
1.5.6: 
https://github.com/apache/flink/commit/b0b1548d5476d8a1e3ccf89a180773e725b8e5f6

> Update test_batch_allround.sh e2e to new testing infrastructure
> ---
>
> Key: FLINK-10614
> URL: https://issues.apache.org/jira/browse/FLINK-10614
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.4, 1.6.1, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> The {{test_batch_allround.sh}} still does clean up tasks which is done by the 
> {{test-runner-common.sh}}. This is no longer needed and should be the 
> responsibility of the latter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann closed pull request #6886: [BP-1.5][FLINK-10614] Let test_batch_allround.sh rely on test-runner-common.sh for clean up

2018-10-19 Thread GitBox
tillrohrmann closed pull request #6886: [BP-1.5][FLINK-10614] Let 
test_batch_allround.sh rely on test-runner-common.sh for clean up
URL: https://github.com/apache/flink/pull/6886
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index 0f4d5bade30..c4a08586be9 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -400,6 +400,15 @@ function tm_watchdog {
   done
 }
 
+function start_taskmanagers {
+tmnum=$1
+echo "Start ${tmnum} more task managers"
+for (( c=0; c> 
$FLINK_DIR/conf/flink-conf.yaml
 echo "taskmanager.network.memory.max: 10485760" >> 
$FLINK_DIR/conf/flink-conf.yaml
 
-backup_config
 set_conf_ssl
 start_cluster
-$FLINK_DIR/bin/taskmanager.sh start
-$FLINK_DIR/bin/taskmanager.sh start
-$FLINK_DIR/bin/taskmanager.sh start
-
-function test_cleanup {
-  # don't call ourselves again for another signal interruption
-  trap "exit -1" INT
-  # don't call ourselves again for normal exit
-  trap "" EXIT
-
-  stop_cluster
-  $FLINK_DIR/bin/taskmanager.sh stop-all
-
-  # revert our modifications to the Flink distribution
-  mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
-}
-trap test_cleanup INT
-trap test_cleanup EXIT
+start_taskmanagers 3
 
 $FLINK_DIR/bin/flink run -p 4 $TEST_PROGRAM_JAR --loadFactor 4 --outputPath 
$TEST_DATA_DIR/out/dataset_allround
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann closed pull request #6885: [BP-1.6][FLINK-10614] Let test_batch_allround.sh rely on test-runner-common.sh for clean up

2018-10-19 Thread GitBox
tillrohrmann closed pull request #6885: [BP-1.6][FLINK-10614] Let 
test_batch_allround.sh rely on test-runner-common.sh for clean up
URL: https://github.com/apache/flink/pull/6885
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh 
b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
index 2bb274b9557..f8f591f7dc0 100755
--- a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
+++ b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
@@ -24,30 +24,12 @@ 
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAll
 echo "Run DataSet-Allround-Test Program"
 
 # modify configuration to include spilling to disk
-cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
 echo "taskmanager.network.memory.min: 10485760" >> 
$FLINK_DIR/conf/flink-conf.yaml
 echo "taskmanager.network.memory.max: 10485760" >> 
$FLINK_DIR/conf/flink-conf.yaml
 
 set_conf_ssl
 start_cluster
-$FLINK_DIR/bin/taskmanager.sh start
-$FLINK_DIR/bin/taskmanager.sh start
-$FLINK_DIR/bin/taskmanager.sh start
-
-function test_cleanup {
-  # don't call ourselves again for another signal interruption
-  trap "exit -1" INT
-  # don't call ourselves again for normal exit
-  trap "" EXIT
-
-  stop_cluster
-  $FLINK_DIR/bin/taskmanager.sh stop-all
-
-  # revert our modifications to the Flink distribution
-  mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
-}
-trap test_cleanup INT
-trap test_cleanup EXIT
+start_taskmanagers 3
 
 $FLINK_DIR/bin/flink run -p 4 $TEST_PROGRAM_JAR --loadFactor 4 --outputPath 
$TEST_DATA_DIR/out/dataset_allround
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10614) Update test_batch_allround.sh e2e to new testing infrastructure

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657215#comment-16657215
 ] 

ASF GitHub Bot commented on FLINK-10614:


tillrohrmann closed pull request #6884: [FLINK-10614] Let 
test_batch_allround.sh rely on test-runner-common.sh for clean up
URL: https://github.com/apache/flink/pull/6884
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh 
b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
index 2bb274b9557..f8f591f7dc0 100755
--- a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
+++ b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
@@ -24,30 +24,12 @@ 
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAll
 echo "Run DataSet-Allround-Test Program"
 
 # modify configuration to include spilling to disk
-cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
 echo "taskmanager.network.memory.min: 10485760" >> 
$FLINK_DIR/conf/flink-conf.yaml
 echo "taskmanager.network.memory.max: 10485760" >> 
$FLINK_DIR/conf/flink-conf.yaml
 
 set_conf_ssl
 start_cluster
-$FLINK_DIR/bin/taskmanager.sh start
-$FLINK_DIR/bin/taskmanager.sh start
-$FLINK_DIR/bin/taskmanager.sh start
-
-function test_cleanup {
-  # don't call ourselves again for another signal interruption
-  trap "exit -1" INT
-  # don't call ourselves again for normal exit
-  trap "" EXIT
-
-  stop_cluster
-  $FLINK_DIR/bin/taskmanager.sh stop-all
-
-  # revert our modifications to the Flink distribution
-  mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
-}
-trap test_cleanup INT
-trap test_cleanup EXIT
+start_taskmanagers 3
 
 $FLINK_DIR/bin/flink run -p 4 $TEST_PROGRAM_JAR --loadFactor 4 --outputPath 
$TEST_DATA_DIR/out/dataset_allround
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update test_batch_allround.sh e2e to new testing infrastructure
> ---
>
> Key: FLINK-10614
> URL: https://issues.apache.org/jira/browse/FLINK-10614
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.4, 1.6.1, 1.7.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> The {{test_batch_allround.sh}} still does clean up tasks which is done by the 
> {{test-runner-common.sh}}. This is no longer needed and should be the 
> responsibility of the latter.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann closed pull request #6884: [FLINK-10614] Let test_batch_allround.sh rely on test-runner-common.sh for clean up

2018-10-19 Thread GitBox
tillrohrmann closed pull request #6884: [FLINK-10614] Let 
test_batch_allround.sh rely on test-runner-common.sh for clean up
URL: https://github.com/apache/flink/pull/6884
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh 
b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
index 2bb274b9557..f8f591f7dc0 100755
--- a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
+++ b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
@@ -24,30 +24,12 @@ 
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAll
 echo "Run DataSet-Allround-Test Program"
 
 # modify configuration to include spilling to disk
-cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
 echo "taskmanager.network.memory.min: 10485760" >> 
$FLINK_DIR/conf/flink-conf.yaml
 echo "taskmanager.network.memory.max: 10485760" >> 
$FLINK_DIR/conf/flink-conf.yaml
 
 set_conf_ssl
 start_cluster
-$FLINK_DIR/bin/taskmanager.sh start
-$FLINK_DIR/bin/taskmanager.sh start
-$FLINK_DIR/bin/taskmanager.sh start
-
-function test_cleanup {
-  # don't call ourselves again for another signal interruption
-  trap "exit -1" INT
-  # don't call ourselves again for normal exit
-  trap "" EXIT
-
-  stop_cluster
-  $FLINK_DIR/bin/taskmanager.sh stop-all
-
-  # revert our modifications to the Flink distribution
-  mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
-}
-trap test_cleanup INT
-trap test_cleanup EXIT
+start_taskmanagers 3
 
 $FLINK_DIR/bin/flink run -p 4 $TEST_PROGRAM_JAR --loadFactor 4 --outputPath 
$TEST_DATA_DIR/out/dataset_allround
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10558) Port YARNHighAvailabilityITCase and YARNSessionCapacitySchedulerITCase to new code base

2018-10-19 Thread TisonKun (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TisonKun reassigned FLINK-10558:


Assignee: TisonKun

> Port YARNHighAvailabilityITCase and YARNSessionCapacitySchedulerITCase to new 
> code base
> ---
>
> Key: FLINK-10558
> URL: https://issues.apache.org/jira/browse/FLINK-10558
> Project: Flink
>  Issue Type: Sub-task
>Reporter: vinoyang
>Assignee: TisonKun
>Priority: Minor
>
> {{YARNHighAvailabilityITCase}}, 
> {{YARNSessionCapacitySchedulerITCase#testClientStartup,}} 
> {{YARNSessionCapacitySchedulerITCase#testTaskManagerFailure}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7351) test instability in JobClientActorRecoveryITCase#testJobClientRecovery

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-7351:
-
Fix Version/s: (was: 1.7.0)

> test instability in JobClientActorRecoveryITCase#testJobClientRecovery
> --
>
> Key: FLINK-7351
> URL: https://issues.apache.org/jira/browse/FLINK-7351
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, Tests
>Affects Versions: 1.3.2, 1.4.0
>Reporter: Nico Kruber
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.6, 1.6.3
>
>
> On a 16-core VM, the following test failed during {{mvn clean verify}}
> {code}
> Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 22.814 sec 
> <<< FAILURE! - in org.apache.flink.runtime.client.JobClientActorRecoveryITCase
> testJobClientRecovery(org.apache.flink.runtime.client.JobClientActorRecoveryITCase)
>   Time elapsed: 21.299 sec  <<< ERROR!
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Not enough free slots available to run the job. You can decrease the operator 
> parallelism or increase the number of slots per TaskManager in the 
> configuration. Resources available to scheduler: Number of instances=0, total 
> number of slots=0, available slots=0
> at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:334)
> at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:139)
> at 
> org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:368)
> at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:309)
> at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596)
> at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:450)
> at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleLazy(ExecutionGraph.java:834)
> at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:814)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1425)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9000) Add automated cluster framework tests

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9000:
-
Fix Version/s: (was: 1.7.0)

> Add automated cluster framework tests
> -
>
> Key: FLINK-9000
> URL: https://issues.apache.org/jira/browse/FLINK-9000
> Project: Flink
>  Issue Type: Task
>  Components: Tests
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.8.0
>
>
> Similar to the end-to-end tests which are added to the 
> {{flink-end-to-end-tests}} module, we should add cluster framework tests 
> which we can run automatically.
> One idea how to launch a cluster framework would be to use Terraform to 
> allocate nodes from AWS or GCE. Next we could setup a different cluster 
> framework like Yarn or Mesos on which we run Flink. In order to simulate 
> different failure scenarios a framework like Jepsen could be used.
> This issue serves as an umbrella issue to collect all issues which are 
> related to cluster framework tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9000) Add automated cluster framework tests

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9000:
-
Fix Version/s: 1.8.0

> Add automated cluster framework tests
> -
>
> Key: FLINK-9000
> URL: https://issues.apache.org/jira/browse/FLINK-9000
> Project: Flink
>  Issue Type: Task
>  Components: Tests
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.8.0
>
>
> Similar to the end-to-end tests which are added to the 
> {{flink-end-to-end-tests}} module, we should add cluster framework tests 
> which we can run automatically.
> One idea how to launch a cluster framework would be to use Terraform to 
> allocate nodes from AWS or GCE. Next we could setup a different cluster 
> framework like Yarn or Mesos on which we run Flink. In order to simulate 
> different failure scenarios a framework like Jepsen could be used.
> This issue serves as an umbrella issue to collect all issues which are 
> related to cluster framework tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657026#comment-16657026
 ] 

ASF GitHub Bot commented on FLINK-10527:


yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in 
YarnTestBase
URL: https://github.com/apache/flink/pull/6816#issuecomment-431419776
 
 
   @tillrohrmann I have updated this PR, just keep the change for class 
`YARNSessionFIFOITCase `.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cleanup constant isNewMode in YarnTestBase
> --
>
> Key: FLINK-10527
> URL: https://issues.apache.org/jira/browse/FLINK-10527
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> This seems to be a residual problem with FLINK-10396. It is set to true in 
> that PR. Currently it has three usage scenarios:
> 1. assert, caused an error
> {code:java}
> assumeTrue("The new mode does not start TMs upfront.", !isNewMode);
> {code}
> 2. if (!isNewMode) the logic in the block would not have invoked, the if 
> block can be removed
> 3. if (isNewMode) always been invoked, the if statement can be removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase

2018-10-19 Thread GitBox
yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in 
YarnTestBase
URL: https://github.com/apache/flink/pull/6816#issuecomment-431419776
 
 
   @tillrohrmann I have updated this PR, just keep the change for class 
`YARNSessionFIFOITCase `.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10357) Streaming File Sink end-to-end test failed with mismatch

2018-10-19 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-10357:


Assignee: Gary Yao

> Streaming File Sink end-to-end test failed with mismatch
> 
>
> Key: FLINK-10357
> URL: https://issues.apache.org/jira/browse/FLINK-10357
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.6.3, 1.7.0
>
> Attachments: flink-logs.tgz, flink-streaming-file-sink-logs.tgz
>
>
> The {{Streaming File Sink end-to-end test}} failed on an Amazon instance with 
> the following result: 
> {code}
> FAIL File Streaming Sink: Output hash mismatch.  Got 
> f2000bbc18a889dc8ec4b6f2b47bf9f5, expected 6727342fdd3aae2129e61fc8f433fb6f.
> head hexdump of actual:
> 000   0  \n   1  \n   2  \n   3  \n   4  \n   5  \n   6  \n   7  \n
> 010   8  \n   9  \n
> 014
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10616) Jepsen test fails while tearing down Hadoop

2018-10-19 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-10616:
-
Description: 
While tearing down Hadoop, the tests sporadically fail with the exception below:

{noformat}

Caused by: java.lang.RuntimeException: sudo -S -u root bash -c "cd /; ps aux | 
grep hadoop | grep -v grep | awk \"\{print \\\$2}\" | xargs kill -9" returned 
non-zero exit status 123 on 172.31.39.235. STDOUT:


STDERR:

    at jepsen.control$throw_on_nonzero_exit.invokeStatic(control.clj:129) 
~[jepsen-0.1.10.jar:na]
    at jepsen.control$throw_on_nonzero_exit.invoke(control.clj:122) 
~[jepsen-0.1.10.jar:na]
    at jepsen.control$exec_STAR_.invokeStatic(control.clj:166) 
~[jepsen-0.1.10.jar:na]
    at jepsen.control$exec_STAR_.doInvoke(control.clj:163) 
~[jepsen-0.1.10.jar:na]
    at clojure.lang.RestFn.applyTo(RestFn.java:137) [clojure-1.9.0.jar:na]
    at clojure.core$apply.invokeStatic(core.clj:657) ~[clojure-1.9.0.jar:na]
    at clojure.core$apply.invoke(core.clj:652) ~[clojure-1.9.0.jar:na]
    at jepsen.control$exec.invokeStatic(control.clj:182) 
~[jepsen-0.1.10.jar:na]
    at jepsen.control$exec.doInvoke(control.clj:176) ~[jepsen-0.1.10.jar:na]
    at clojure.lang.RestFn.invoke(RestFn.java:2088) [clojure-1.9.0.jar:na]
    at jepsen.control.util$grepkill_BANG_.invokeStatic(util.clj:197) 
~[classes/:na]
    at jepsen.control.util$grepkill_BANG_.invoke(util.clj:191) 
~[classes/:na]
    at jepsen.control.util$grepkill_BANG_.invokeStatic(util.clj:194) 
~[classes/:na]
    at jepsen.control.util$grepkill_BANG_.invoke(util.clj:191) 
~[classes/:na]
    at jepsen.flink.hadoop$db$reify__3102.teardown_BANG_(hadoop.clj:128) 
~[classes/:na]
    at jepsen.flink.db$combined_db$reify__217$fn__220.invoke(db.clj:119) 
~[na:na]
    at clojure.core$map$fn__5587.invoke(core.clj:2745) 
~[clojure-1.9.0.jar:na]
    at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.9.0.jar:na]
    at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.9.0.jar:na]
    at clojure.lang.RT.seq(RT.java:528) ~[clojure-1.9.0.jar:na]
    at clojure.core$seq__5124.invokeStatic(core.clj:137) 
~[clojure-1.9.0.jar:na]
    at clojure.core$dorun.invokeStatic(core.clj:3125) 
~[clojure-1.9.0.jar:na]
    at clojure.core$doall.invokeStatic(core.clj:3140) 
~[clojure-1.9.0.jar:na]
    at clojure.core$doall.invoke(core.clj:3140) ~[clojure-1.9.0.jar:na]
    at jepsen.flink.db$combined_db$reify__217.teardown_BANG_(db.clj:119) 
~[na:na]
    at jepsen.db$fn__2137$G__2133__2141.invoke(db.clj:8) 
~[jepsen-0.1.10.jar:na]
    at jepsen.db$fn__2137$G__2132__2146.invoke(db.clj:8) 
~[jepsen-0.1.10.jar:na]
    at clojure.core$partial$fn__5561.invoke(core.clj:2617) 
~[clojure-1.9.0.jar:na]
    at jepsen.control$on_nodes$fn__2116.invoke(control.clj:372) 
~[jepsen-0.1.10.jar:na]
    at clojure.lang.AFn.applyToHelper(AFn.java:154) ~[clojure-1.9.0.jar:na]
    at clojure.lang.AFn.applyTo(AFn.java:144) ~[clojure-1.9.0.jar:na]
    at clojure.core$apply.invokeStatic(core.clj:657) ~[clojure-1.9.0.jar:na]
    at clojure.core$with_bindings_STAR_.invokeStatic(core.clj:1965) 
~[clojure-1.9.0.jar:na]
    at clojure.core$with_bindings_STAR_.doInvoke(core.clj:1965) 
~[clojure-1.9.0.jar:na]
    at clojure.lang.RestFn.applyTo(RestFn.java:142) [clojure-1.9.0.jar:na]
    at clojure.core$apply.invokeStatic(core.clj:661) ~[clojure-1.9.0.jar:na]
    at clojure.core$bound_fn_STAR_$fn__5471.doInvoke(core.clj:1995) 
~[clojure-1.9.0.jar:na]
    at clojure.lang.RestFn.invoke(RestFn.java:408) [clojure-1.9.0.jar:na]
    at jepsen.util$real_pmap$launcher__1168$fn__1169.invoke(util.clj:49) 
~[jepsen-0.1.10.jar:na]
    at clojure.core$binding_conveyor_fn$fn__5476.invoke(core.clj:2022) 
~[clojure-1.9.0.jar:na]
    at clojure.lang.AFn.call(AFn.java:18) ~[clojure-1.9.0.jar:na]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[na:1.8.0_171]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[na:1.8.0_171]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[na:1.8.0_171]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_171]

{noformat}

  was:
While tearing down Hadoop, the test fails with the exception below:

{noformat}

Caused by: java.lang.RuntimeException: sudo -S -u root bash -c "cd /; ps aux | 
grep hadoop | grep -v grep | awk \"\{print \\\$2}\" | xargs kill -9" returned 
non-zero exit status 123 on 172.31.39.235. STDOUT:


STDERR:

    at jepsen.control$throw_on_nonzero_exit.invokeStatic(control.clj:129) 
~[jepsen-0.1.10.jar:na]
    at jepsen.control$throw_on_nonzero_exit.invoke(control.clj:122) 
~[jepsen-0.1.10.jar:na]
    at jepsen.control$exec_STAR_.invokeStatic(control.clj:166) 
~[jepsen-0.1.10.jar:na]
  

[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656964#comment-16656964
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226691526
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
 
 Review comment:
   Sometimes it can be a good idea to separate multiple precondition checks 
into a separate method so that it is easier to focus on the main logic of the 
function.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement state conversion procedure in state backends
> --
>
> Key: FLINK-9808
> URL: https://issues.apache.org/jira/browse/FLINK-9808
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9377 in place and that config snapshots serve as the single source 
> of truth for recreating restore serializers, the next step would be to 
> utilize this when performing a full-pass state conversion (i.e., read with 
> old / restore serializer, write with new serializer).
> For Flink's heap-based backends, it can be seen that state conversion 
> inherently happens, since all state is always deserialized after restore with 
> the restore serializer, and written with the new serializer on snapshots.
> For the RocksDB state backend, since state is lazily deserialized, state 
> conversion needs to happen for per-registered state on their first access if 
> the registered new serializer has a different serialization schema than the 
> previous serializer.
> This task should consist of three parts:
> 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the 
> new serializer's schema is a) compatible with the serializer as it is, b) 
> compatible after the serializer has been reconfigured, or c) incompatible.
> 2. Introduce state conversion procedures in the RocksDB state backend. This 
> should occur on the first state access.
> 3. Make sure that all other backends no longer do redundant serializer 
> compatibility checks. That is not required because those backends always 
> perform full-pass state conversions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656953#comment-16656953
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226669120
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
+   && !Objects.equals(restoredType, 
StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   stateDesc.getType() == restoredType,
+   "Incompatible key/value state types. " +
+   "Was [" + restoredType + "], " +
+   "registered with [" + 
stateDesc.getType() + "].");
+   }
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
 
 Review comment:
   It seems that now both branches in the calling method 
`tryRegisterKvStateInformation` create a `newMetaInfo`. Why not always create 
it there and pass it as only argument to this function which can then transform 
it if needed? Could deduplicate and simplify some code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement state conversion procedure in state backends
> --
>
> Key: FLINK-9808
> URL: https://issues.apache.org/jira/browse/FLINK-9808
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9377 in place and that config snapshots serve as the single source 
> of truth for recreating restore serializers, the next step would be to 
> utilize this when performing a full-pass state conversion (i.e., read with 
> old / restore serializer, write with new serializer).
> For Flink's heap-based backends, it can be seen that state conversion 
> inherently happens, since all state is always deserialized after restore with 
> the restore serializer, 

[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656965#comment-16656965
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226689891
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
+   && !Objects.equals(restoredType, 
StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   stateDesc.getType() == restoredType,
+   "Incompatible key/value state types. " +
+   "Was [" + restoredType + "], " +
+   "registered with [" + 
stateDesc.getType() + "].");
+   }
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer,
+   snapshotTransformer);
+
+   CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   namespaceSerializer);
+
+   CompatibilityResult stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   stateSerializer);
+
+   if (namespaceCompatibility.isRequiresMigration()) {
+   throw new UnsupportedOperationException("The new 
namespace serializer requires state migration in order for the job to proceed." 
+
+   " However, migration for state namespace 
currently isn't supported.");
+   }
+
+   if (stateCompatibility.isRequiresMigration()) {
+

[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656963#comment-16656963
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226676987
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ##
 @@ -269,10 +269,11 @@ public HeapKeyedStateBackend(
"Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
" but its corresponding restored 
snapshot cannot be found.");
 
-   newMetaInfo = 
RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
-   restoredMetaInfoSnapshot,
+   newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
 
 Review comment:
   We can now move this outside the if and initialize `newMetaInfo` directly 
for all cases to reduce code duplication.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement state conversion procedure in state backends
> --
>
> Key: FLINK-9808
> URL: https://issues.apache.org/jira/browse/FLINK-9808
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9377 in place and that config snapshots serve as the single source 
> of truth for recreating restore serializers, the next step would be to 
> utilize this when performing a full-pass state conversion (i.e., read with 
> old / restore serializer, write with new serializer).
> For Flink's heap-based backends, it can be seen that state conversion 
> inherently happens, since all state is always deserialized after restore with 
> the restore serializer, and written with the new serializer on snapshots.
> For the RocksDB state backend, since state is lazily deserialized, state 
> conversion needs to happen for per-registered state on their first access if 
> the registered new serializer has a different serialization schema than the 
> previous serializer.
> This task should consist of three parts:
> 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the 
> new serializer's schema is a) compatible with the serializer as it is, b) 
> compatible after the serializer has been reconfigured, or c) incompatible.
> 2. Introduce state conversion procedures in the RocksDB state backend. This 
> should occur on the first state access.
> 3. Make sure that all other backends no longer do redundant serializer 
> compatibility checks. That is not required because those backends always 
> perform full-pass state conversions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656962#comment-16656962
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226687515
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
+   && !Objects.equals(restoredType, 
StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   stateDesc.getType() == restoredType,
+   "Incompatible key/value state types. " +
+   "Was [" + restoredType + "], " +
+   "registered with [" + 
stateDesc.getType() + "].");
+   }
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer,
+   snapshotTransformer);
+
+   CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   namespaceSerializer);
+
+   CompatibilityResult stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   stateSerializer);
+
+   if (namespaceCompatibility.isRequiresMigration()) {
+   throw new UnsupportedOperationException("The new 
namespace serializer requires state migration in order for the job to proceed." 
+
+   " However, migration for state namespace 
currently isn't supported.");
+   }
+
+   if (stateCompatibility.isRequiresMigration()) {
+

[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656957#comment-16656957
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226688632
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
+   && !Objects.equals(restoredType, 
StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   stateDesc.getType() == restoredType,
+   "Incompatible key/value state types. " +
+   "Was [" + restoredType + "], " +
+   "registered with [" + 
stateDesc.getType() + "].");
+   }
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer,
+   snapshotTransformer);
+
+   CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   namespaceSerializer);
+
+   CompatibilityResult stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   stateSerializer);
+
+   if (namespaceCompatibility.isRequiresMigration()) {
+   throw new UnsupportedOperationException("The new 
namespace serializer requires state migration in order for the job to proceed." 
+
+   " However, migration for state namespace 
currently isn't supported.");
+   }
+
+   if (stateCompatibility.isRequiresMigration()) {
+

[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656952#comment-16656952
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226685552
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
+   && !Objects.equals(restoredType, 
StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   stateDesc.getType() == restoredType,
+   "Incompatible key/value state types. " +
+   "Was [" + restoredType + "], " +
+   "registered with [" + 
stateDesc.getType() + "].");
+   }
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer,
+   snapshotTransformer);
+
+   CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   namespaceSerializer);
+
+   CompatibilityResult stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   stateSerializer);
+
+   if (namespaceCompatibility.isRequiresMigration()) {
+   throw new UnsupportedOperationException("The new 
namespace serializer requires state migration in order for the job to proceed." 
+
+   " However, migration for state namespace 
currently isn't supported.");
+   }
+
+   if (stateCompatibility.isRequiresMigration()) {
+

[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656958#comment-16656958
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226684033
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ##
 @@ -154,6 +159,29 @@ public void setCurrentNamespace(N namespace) {
return backend.db.get(columnFamily, 
tmpKeySerializationView.getCopyOfBuffer());
}
 
+   public byte[] migrateSerializedValue(
+   byte[] serializedOldValue,
+   TypeSerializer priorSerializer,
+   TypeSerializer newSerializer) throws 
StateMigrationException {
+
+   try {
+   V value = priorSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(serializedOldValue)));
+
+   ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
 
 Review comment:
   This method is called in a loop. You should avoid creating streams and other 
temporary objects on each invocation. Instead, let the calling loop pass them 
in and reuse the same instances for the whole process. If we had a 
`StateValueTransformer` instance, they could also hold them as members.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement state conversion procedure in state backends
> --
>
> Key: FLINK-9808
> URL: https://issues.apache.org/jira/browse/FLINK-9808
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9377 in place and that config snapshots serve as the single source 
> of truth for recreating restore serializers, the next step would be to 
> utilize this when performing a full-pass state conversion (i.e., read with 
> old / restore serializer, write with new serializer).
> For Flink's heap-based backends, it can be seen that state conversion 
> inherently happens, since all state is always deserialized after restore with 
> the restore serializer, and written with the new serializer on snapshots.
> For the RocksDB state backend, since state is lazily deserialized, state 
> conversion needs to happen for per-registered state on their first access if 
> the registered new serializer has a different serialization schema than the 
> previous serializer.
> This task should consist of three parts:
> 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the 
> new serializer's schema is a) compatible with the serializer as it is, b) 
> compatible after the serializer has been reconfigured, or c) incompatible.
> 2. Introduce state conversion procedures in the RocksDB state backend. This 
> should occur on the first state access.
> 3. Make sure that all other backends no longer do redundant serializer 
> compatibility checks. That is not required because those backends always 
> perform full-pass state conversions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656954#comment-16656954
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226662102
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
+   && !Objects.equals(restoredType, 
StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   stateDesc.getType() == restoredType,
+   "Incompatible key/value state types. " +
+   "Was [" + restoredType + "], " +
+   "registered with [" + 
stateDesc.getType() + "].");
+   }
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer,
+   snapshotTransformer);
+
+   CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
 
 Review comment:
   This new code is calling methods from a deprecated class. I suggest to 
either remove the deprecation or to use the newer replacement.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement state conversion procedure in state backends
> --
>
> Key: FLINK-9808
> URL: https://issues.apache.org/jira/browse/FLINK-9808
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9377 in place and that config snapshots serve as the single source 
> of truth for recreating restore serializers, the next step would be to 
> utilize this when performing a full-pass state conversion (i.e., read with 
> old / restore serializer, write with n

[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656959#comment-16656959
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226686907
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
+   && !Objects.equals(restoredType, 
StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   stateDesc.getType() == restoredType,
+   "Incompatible key/value state types. " +
+   "Was [" + restoredType + "], " +
+   "registered with [" + 
stateDesc.getType() + "].");
+   }
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer,
+   snapshotTransformer);
+
+   CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   namespaceSerializer);
+
+   CompatibilityResult stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   stateSerializer);
+
+   if (namespaceCompatibility.isRequiresMigration()) {
+   throw new UnsupportedOperationException("The new 
namespace serializer requires state migration in order for the job to proceed." 
+
+   " However, migration for state namespace 
currently isn't supported.");
+   }
+
+   if (stateCompatibility.isRequiresMigration()) {
+

[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656955#comment-16656955
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226680634
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
+   && !Objects.equals(restoredType, 
StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   stateDesc.getType() == restoredType,
+   "Incompatible key/value state types. " +
+   "Was [" + restoredType + "], " +
+   "registered with [" + 
stateDesc.getType() + "].");
+   }
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer,
+   snapshotTransformer);
+
+   CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   namespaceSerializer);
+
+   CompatibilityResult stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   stateSerializer);
+
+   if (namespaceCompatibility.isRequiresMigration()) {
+   throw new UnsupportedOperationException("The new 
namespace serializer requires state migration in order for the job to proceed." 
+
+   " However, migration for state namespace 
currently isn't supported.");
+   }
+
+   if (stateCompatibility.isRequiresMigration()) {
+

[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656951#comment-16656951
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226683613
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ##
 @@ -154,6 +159,29 @@ public void setCurrentNamespace(N namespace) {
return backend.db.get(columnFamily, 
tmpKeySerializationView.getCopyOfBuffer());
}
 
+   public byte[] migrateSerializedValue(
+   byte[] serializedOldValue,
+   TypeSerializer priorSerializer,
+   TypeSerializer newSerializer) throws 
StateMigrationException {
+
+   try {
+   V value = priorSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(serializedOldValue)));
+
+   ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
 
 Review comment:
   Better use `DataInputDeserializer` / `DataOutputSerializer` instead of 
wrapped byte-array streams. They are quiet a bit faster.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement state conversion procedure in state backends
> --
>
> Key: FLINK-9808
> URL: https://issues.apache.org/jira/browse/FLINK-9808
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9377 in place and that config snapshots serve as the single source 
> of truth for recreating restore serializers, the next step would be to 
> utilize this when performing a full-pass state conversion (i.e., read with 
> old / restore serializer, write with new serializer).
> For Flink's heap-based backends, it can be seen that state conversion 
> inherently happens, since all state is always deserialized after restore with 
> the restore serializer, and written with the new serializer on snapshots.
> For the RocksDB state backend, since state is lazily deserialized, state 
> conversion needs to happen for per-registered state on their first access if 
> the registered new serializer has a different serialization schema than the 
> previous serializer.
> This task should consist of three parts:
> 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the 
> new serializer's schema is a) compatible with the serializer as it is, b) 
> compatible after the serializer has been reconfigured, or c) incompatible.
> 2. Introduce state conversion procedures in the RocksDB state backend. This 
> should occur on the first state access.
> 3. Make sure that all other backends no longer do redundant serializer 
> compatibility checks. That is not required because those backends always 
> perform full-pass state conversions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656950#comment-16656950
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r22394
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
 
 Review comment:
   I suggest to compare enums with `!=` instead of `!Objects.equals`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement state conversion procedure in state backends
> --
>
> Key: FLINK-9808
> URL: https://issues.apache.org/jira/browse/FLINK-9808
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9377 in place and that config snapshots serve as the single source 
> of truth for recreating restore serializers, the next step would be to 
> utilize this when performing a full-pass state conversion (i.e., read with 
> old / restore serializer, write with new serializer).
> For Flink's heap-based backends, it can be seen that state conversion 
> inherently happens, since all state is always deserialized after restore with 
> the restore serializer, and written with the new serializer on snapshots.
> For the RocksDB state backend, since state is lazily deserialized, state 
> conversion needs to happen for per-registered state on their first access if 
> the registered new serializer has a different serialization schema than the 
> previous serializer.
> This task should consist of three parts:
> 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the 
> new serializer's schema is a) compatible with the serializer as it is, b) 
> compatible after the serializer has been reconfigured, or c) incompatible.
> 2. Introduce state conversion procedures in the RocksDB state backend. This 
> should occur on the first state access.
> 3. Make sure that all other backends no longer do redundant serializer 
> compatibility checks. That is not required because those backends always 
> perform fu

[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656960#comment-16656960
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226690598
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
 
 Review comment:
   I have an ongoing effort to shrink the size of the 
`RocksDBKeyedStateBackend` class. I think it is a good idea to move the 
migration concern in a separate class file if possible to keep different 
concerns more separated whereever possible. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement state conversion procedure in state backends
> --
>
> Key: FLINK-9808
> URL: https://issues.apache.org/jira/browse/FLINK-9808
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9377 in place and that config snapshots serve as the single source 
> of truth for recreating restore serializers, the next step would be to 
> utilize this when performing a full-pass state conversion (i.e., read with 
> old / restore serializer, write with new serializer).
> For Flink's heap-based backends, it can be seen that state conversion 
> inherently happens, since all state is always deserialized after restore with 
> the restore serializer, and written with the new serializer on snapshots.
> For the RocksDB state backend, since state is lazily deserialized, state 
> conversion needs to happen for per-registered state on their first access if 
> the registered new serializer has a different serialization schema than the 
> previous serializer.
> This task should consist of three parts:
> 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the 
> new serializer's schema is a) compatible with the serializer as it is, b) 
> compatible after the serializer has been reconfigured, or c) incompatible.
> 2. Introduce state conversion procedures in the RocksDB state backend. This 
> should occur on the first state access.
> 3. Make sure that all other backends no longer do redundant serializer 
> compatibility checks. That is not required because those backends always 
> perform full-pass state conversions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656956#comment-16656956
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226663119
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
 
 Review comment:
   This line and the following check are already done by the caller in exactly 
the same way. Wouldn't it make more sense to then pass `StateMetaInfoSnapshot` 
as argument from the caller?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement state conversion procedure in state backends
> --
>
> Key: FLINK-9808
> URL: https://issues.apache.org/jira/browse/FLINK-9808
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9377 in place and that config snapshots serve as the single source 
> of truth for recreating restore serializers, the next step would be to 
> utilize this when performing a full-pass state conversion (i.e., read with 
> old / restore serializer, write with new serializer).
> For Flink's heap-based backends, it can be seen that state conversion 
> inherently happens, since all state is always deserialized after restore with 
> the restore serializer, and written with the new serializer on snapshots.
> For the RocksDB state backend, since state is lazily deserialized, state 
> conversion needs to happen for per-registered state on their first access if 
> the registered new serializer has a different serialization schema than the 
> previous serializer.
> This task should consist of three parts:
> 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the 
> new serializer's schema is a) compatible with the serializer as it is, b) 
> compatible after the serializer has been reconfigured, or c) incompatible.
> 2. Introduce state conversion procedures in the RocksDB state backend. This 
> should occur on the first state access.
> 3. Make sure that all other backends no longer do redundant serializer 
> compatibility checks. That is not required because those backends always 
> perform full-pass state conversions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656961#comment-16656961
 ] 

ASF GitHub Bot commented on FLINK-9808:
---

StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226679606
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ##
 @@ -154,6 +159,29 @@ public void setCurrentNamespace(N namespace) {
return backend.db.get(columnFamily, 
tmpKeySerializationView.getCopyOfBuffer());
}
 
+   public byte[] migrateSerializedValue(
 
 Review comment:
   I think this method does not belong here. This class is used to interact 
with the state during event processing, I suggest to keep that separated from 
migration concerns. If you are looking for a way to differentiate the logic for 
different states, you could also just use a factor that gives you different 
implementations of some `StateValueTransformer`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement state conversion procedure in state backends
> --
>
> Key: FLINK-9808
> URL: https://issues.apache.org/jira/browse/FLINK-9808
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Aljoscha Krettek
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> With FLINK-9377 in place and that config snapshots serve as the single source 
> of truth for recreating restore serializers, the next step would be to 
> utilize this when performing a full-pass state conversion (i.e., read with 
> old / restore serializer, write with new serializer).
> For Flink's heap-based backends, it can be seen that state conversion 
> inherently happens, since all state is always deserialized after restore with 
> the restore serializer, and written with the new serializer on snapshots.
> For the RocksDB state backend, since state is lazily deserialized, state 
> conversion needs to happen for per-registered state on their first access if 
> the registered new serializer has a different serialization schema than the 
> previous serializer.
> This task should consist of three parts:
> 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the 
> new serializer's schema is a) compatible with the serializer as it is, b) 
> compatible after the serializer has been reconfigured, or c) incompatible.
> 2. Introduce state conversion procedures in the RocksDB state backend. This 
> should occur on the first state access.
> 3. Make sure that all other backends no longer do redundant serializer 
> compatibility checks. That is not required because those backends always 
> perform full-pass state conversions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-19 Thread GitBox
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226684033
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ##
 @@ -154,6 +159,29 @@ public void setCurrentNamespace(N namespace) {
return backend.db.get(columnFamily, 
tmpKeySerializationView.getCopyOfBuffer());
}
 
+   public byte[] migrateSerializedValue(
+   byte[] serializedOldValue,
+   TypeSerializer priorSerializer,
+   TypeSerializer newSerializer) throws 
StateMigrationException {
+
+   try {
+   V value = priorSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(serializedOldValue)));
+
+   ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
 
 Review comment:
   This method is called in a loop. You should avoid creating streams and other 
temporary objects on each invocation. Instead, let the calling loop pass them 
in and reuse the same instances for the whole process. If we had a 
`StateValueTransformer` instance, they could also hold them as members.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-19 Thread GitBox
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226687515
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
+   && !Objects.equals(restoredType, 
StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   stateDesc.getType() == restoredType,
+   "Incompatible key/value state types. " +
+   "Was [" + restoredType + "], " +
+   "registered with [" + 
stateDesc.getType() + "].");
+   }
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer,
+   snapshotTransformer);
+
+   CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   namespaceSerializer);
+
+   CompatibilityResult stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   stateSerializer);
+
+   if (namespaceCompatibility.isRequiresMigration()) {
+   throw new UnsupportedOperationException("The new 
namespace serializer requires state migration in order for the job to proceed." 
+
+   " However, migration for state namespace 
currently isn't supported.");
+   }
+
+   if (stateCompatibility.isRequiresMigration()) {
+   migrateStateValue(stateDesc, stateInfo, 
restoredMetaInfoSnapshot, newMetaInfo);
+   } else {
+   newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   newMetaInfo.getStat

[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-19 Thread GitBox
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226690598
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
 
 Review comment:
   I have an ongoing effort to shrink the size of the 
`RocksDBKeyedStateBackend` class. I think it is a good idea to move the 
migration concern in a separate class file if possible to keep different 
concerns more separated whereever possible. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-19 Thread GitBox
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226662102
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
+   && !Objects.equals(restoredType, 
StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   stateDesc.getType() == restoredType,
+   "Incompatible key/value state types. " +
+   "Was [" + restoredType + "], " +
+   "registered with [" + 
stateDesc.getType() + "].");
+   }
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer,
+   snapshotTransformer);
+
+   CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
 
 Review comment:
   This new code is calling methods from a deprecated class. I suggest to 
either remove the deprecation or to use the newer replacement.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-19 Thread GitBox
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226686907
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
+   && !Objects.equals(restoredType, 
StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   stateDesc.getType() == restoredType,
+   "Incompatible key/value state types. " +
+   "Was [" + restoredType + "], " +
+   "registered with [" + 
stateDesc.getType() + "].");
+   }
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer,
+   snapshotTransformer);
+
+   CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   namespaceSerializer);
+
+   CompatibilityResult stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   stateSerializer);
+
+   if (namespaceCompatibility.isRequiresMigration()) {
+   throw new UnsupportedOperationException("The new 
namespace serializer requires state migration in order for the job to proceed." 
+
+   " However, migration for state namespace 
currently isn't supported.");
+   }
+
+   if (stateCompatibility.isRequiresMigration()) {
+   migrateStateValue(stateDesc, stateInfo, 
restoredMetaInfoSnapshot, newMetaInfo);
+   } else {
+   newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   newMetaInfo.getStat

[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-19 Thread GitBox
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226685552
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
+   && !Objects.equals(restoredType, 
StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   stateDesc.getType() == restoredType,
+   "Incompatible key/value state types. " +
+   "Was [" + restoredType + "], " +
+   "registered with [" + 
stateDesc.getType() + "].");
+   }
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer,
+   snapshotTransformer);
+
+   CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   namespaceSerializer);
+
+   CompatibilityResult stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   stateSerializer);
+
+   if (namespaceCompatibility.isRequiresMigration()) {
+   throw new UnsupportedOperationException("The new 
namespace serializer requires state migration in order for the job to proceed." 
+
+   " However, migration for state namespace 
currently isn't supported.");
+   }
+
+   if (stateCompatibility.isRequiresMigration()) {
+   migrateStateValue(stateDesc, stateInfo, 
restoredMetaInfoSnapshot, newMetaInfo);
+   } else {
+   newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   newMetaInfo.getStat

[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-19 Thread GitBox
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226683613
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ##
 @@ -154,6 +159,29 @@ public void setCurrentNamespace(N namespace) {
return backend.db.get(columnFamily, 
tmpKeySerializationView.getCopyOfBuffer());
}
 
+   public byte[] migrateSerializedValue(
+   byte[] serializedOldValue,
+   TypeSerializer priorSerializer,
+   TypeSerializer newSerializer) throws 
StateMigrationException {
+
+   try {
+   V value = priorSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(serializedOldValue)));
+
+   ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+   DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
 
 Review comment:
   Better use `DataInputDeserializer` / `DataOutputSerializer` instead of 
wrapped byte-array streams. They are quiet a bit faster.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-19 Thread GitBox
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226669120
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
+   && !Objects.equals(restoredType, 
StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   stateDesc.getType() == restoredType,
+   "Incompatible key/value state types. " +
+   "Was [" + restoredType + "], " +
+   "registered with [" + 
stateDesc.getType() + "].");
+   }
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
 
 Review comment:
   It seems that now both branches in the calling method 
`tryRegisterKvStateInformation` create a `newMetaInfo`. Why not always create 
it there and pass it as only argument to this function which can then transform 
it if needed? Could deduplicate and simplify some code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-19 Thread GitBox
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226676987
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ##
 @@ -269,10 +269,11 @@ public HeapKeyedStateBackend(
"Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
" but its corresponding restored 
snapshot cannot be found.");
 
-   newMetaInfo = 
RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
-   restoredMetaInfoSnapshot,
+   newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
 
 Review comment:
   We can now move this outside the if and initialize `newMetaInfo` directly 
for all cases to reduce code duplication.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-19 Thread GitBox
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226679606
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ##
 @@ -154,6 +159,29 @@ public void setCurrentNamespace(N namespace) {
return backend.db.get(columnFamily, 
tmpKeySerializationView.getCopyOfBuffer());
}
 
+   public byte[] migrateSerializedValue(
 
 Review comment:
   I think this method does not belong here. This class is used to interact 
with the state during event processing, I suggest to keep that separated from 
migration concerns. If you are looking for a way to differentiate the logic for 
different states, you could also just use a factor that gives you different 
implementations of some `StateValueTransformer`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-19 Thread GitBox
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226688632
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
+   && !Objects.equals(restoredType, 
StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   stateDesc.getType() == restoredType,
+   "Incompatible key/value state types. " +
+   "Was [" + restoredType + "], " +
+   "registered with [" + 
stateDesc.getType() + "].");
+   }
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer,
+   snapshotTransformer);
+
+   CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   namespaceSerializer);
+
+   CompatibilityResult stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   stateSerializer);
+
+   if (namespaceCompatibility.isRequiresMigration()) {
+   throw new UnsupportedOperationException("The new 
namespace serializer requires state migration in order for the job to proceed." 
+
+   " However, migration for state namespace 
currently isn't supported.");
+   }
+
+   if (stateCompatibility.isRequiresMigration()) {
+   migrateStateValue(stateDesc, stateInfo, 
restoredMetaInfoSnapshot, newMetaInfo);
+   } else {
+   newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   newMetaInfo.getStat

[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-19 Thread GitBox
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226663119
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
 
 Review comment:
   This line and the following check are already done by the caller in exactly 
the same way. Wouldn't it make more sense to then pass `StateMetaInfoSnapshot` 
as argument from the caller?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-19 Thread GitBox
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226691526
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
 
 Review comment:
   Sometimes it can be a good idea to separate multiple precondition checks 
into a separate method so that it is easier to focus on the main logic of the 
function.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-19 Thread GitBox
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r22394
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
 
 Review comment:
   I suggest to compare enums with `!=` instead of `!Objects.equals`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-19 Thread GitBox
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226680634
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
+   && !Objects.equals(restoredType, 
StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   stateDesc.getType() == restoredType,
+   "Incompatible key/value state types. " +
+   "Was [" + restoredType + "], " +
+   "registered with [" + 
stateDesc.getType() + "].");
+   }
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer,
+   snapshotTransformer);
+
+   CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   namespaceSerializer);
+
+   CompatibilityResult stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   stateSerializer);
+
+   if (namespaceCompatibility.isRequiresMigration()) {
+   throw new UnsupportedOperationException("The new 
namespace serializer requires state migration in order for the job to proceed." 
+
+   " However, migration for state namespace 
currently isn't supported.");
+   }
+
+   if (stateCompatibility.isRequiresMigration()) {
+   migrateStateValue(stateDesc, stateInfo, 
restoredMetaInfoSnapshot, newMetaInfo);
+   } else {
+   newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   newMetaInfo.getStat

[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends

2018-10-19 Thread GitBox
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state 
backends] Migrate state when necessary in state backends
URL: https://github.com/apache/flink/pull/6875#discussion_r226689891
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ##
 @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  
RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2 
stateInfo,
+   @Nullable StateSnapshotTransformer 
snapshotTransformer) throws Exception {
+
+   StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   
Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType()
+   == 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE,
+   "Incompatible state types. " +
+   "Was [" + 
restoredMetaInfoSnapshot.getBackendStateType() + "], " +
+   "registered as [" + 
StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "].");
+
+   Preconditions.checkState(
+   Objects.equals(stateDesc.getName(), 
restoredMetaInfoSnapshot.getName()),
+   "Incompatible state names. " +
+   "Was [" + restoredMetaInfoSnapshot.getName() + 
"], " +
+   "registered with [" + stateDesc.getName() + 
"].");
+
+   final StateDescriptor.Type restoredType =
+   StateDescriptor.Type.valueOf(
+   restoredMetaInfoSnapshot.getOption(
+   
StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE));
+
+   if (!Objects.equals(stateDesc.getType(), 
StateDescriptor.Type.UNKNOWN)
+   && !Objects.equals(restoredType, 
StateDescriptor.Type.UNKNOWN)) {
+
+   Preconditions.checkState(
+   stateDesc.getType() == restoredType,
+   "Incompatible key/value state types. " +
+   "Was [" + restoredType + "], " +
+   "registered with [" + 
stateDesc.getType() + "].");
+   }
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer,
+   snapshotTransformer);
+
+   CompatibilityResult namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()),
+   namespaceSerializer);
+
+   CompatibilityResult stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   null,
+   
restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()),
+   stateSerializer);
+
+   if (namespaceCompatibility.isRequiresMigration()) {
+   throw new UnsupportedOperationException("The new 
namespace serializer requires state migration in order for the job to proceed." 
+
+   " However, migration for state namespace 
currently isn't supported.");
+   }
+
+   if (stateCompatibility.isRequiresMigration()) {
+   migrateStateValue(stateDesc, stateInfo, 
restoredMetaInfoSnapshot, newMetaInfo);
+   } else {
+   newMetaInfo = new 
RegisteredKeyValueStateBackendMetaInfo<>(
+   newMetaInfo.getStat

[jira] [Created] (FLINK-10616) Jepsen test fails while tearing down Hadoop

2018-10-19 Thread Gary Yao (JIRA)
Gary Yao created FLINK-10616:


 Summary: Jepsen test fails while tearing down Hadoop
 Key: FLINK-10616
 URL: https://issues.apache.org/jira/browse/FLINK-10616
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.6.1, 1.7.0
Reporter: Gary Yao


While tearing down Hadoop, the test fails with the exception below:

{noformat}

Caused by: java.lang.RuntimeException: sudo -S -u root bash -c "cd /; ps aux | 
grep hadoop | grep -v grep | awk \"\{print \\\$2}\" | xargs kill -9" returned 
non-zero exit status 123 on 172.31.39.235. STDOUT:


STDERR:

    at jepsen.control$throw_on_nonzero_exit.invokeStatic(control.clj:129) 
~[jepsen-0.1.10.jar:na]
    at jepsen.control$throw_on_nonzero_exit.invoke(control.clj:122) 
~[jepsen-0.1.10.jar:na]
    at jepsen.control$exec_STAR_.invokeStatic(control.clj:166) 
~[jepsen-0.1.10.jar:na]
    at jepsen.control$exec_STAR_.doInvoke(control.clj:163) 
~[jepsen-0.1.10.jar:na]
    at clojure.lang.RestFn.applyTo(RestFn.java:137) [clojure-1.9.0.jar:na]
    at clojure.core$apply.invokeStatic(core.clj:657) ~[clojure-1.9.0.jar:na]
    at clojure.core$apply.invoke(core.clj:652) ~[clojure-1.9.0.jar:na]
    at jepsen.control$exec.invokeStatic(control.clj:182) 
~[jepsen-0.1.10.jar:na]
    at jepsen.control$exec.doInvoke(control.clj:176) ~[jepsen-0.1.10.jar:na]
    at clojure.lang.RestFn.invoke(RestFn.java:2088) [clojure-1.9.0.jar:na]
    at jepsen.control.util$grepkill_BANG_.invokeStatic(util.clj:197) 
~[classes/:na]
    at jepsen.control.util$grepkill_BANG_.invoke(util.clj:191) 
~[classes/:na]
    at jepsen.control.util$grepkill_BANG_.invokeStatic(util.clj:194) 
~[classes/:na]
    at jepsen.control.util$grepkill_BANG_.invoke(util.clj:191) 
~[classes/:na]
    at jepsen.flink.hadoop$db$reify__3102.teardown_BANG_(hadoop.clj:128) 
~[classes/:na]
    at jepsen.flink.db$combined_db$reify__217$fn__220.invoke(db.clj:119) 
~[na:na]
    at clojure.core$map$fn__5587.invoke(core.clj:2745) 
~[clojure-1.9.0.jar:na]
    at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.9.0.jar:na]
    at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.9.0.jar:na]
    at clojure.lang.RT.seq(RT.java:528) ~[clojure-1.9.0.jar:na]
    at clojure.core$seq__5124.invokeStatic(core.clj:137) 
~[clojure-1.9.0.jar:na]
    at clojure.core$dorun.invokeStatic(core.clj:3125) 
~[clojure-1.9.0.jar:na]
    at clojure.core$doall.invokeStatic(core.clj:3140) 
~[clojure-1.9.0.jar:na]
    at clojure.core$doall.invoke(core.clj:3140) ~[clojure-1.9.0.jar:na]
    at jepsen.flink.db$combined_db$reify__217.teardown_BANG_(db.clj:119) 
~[na:na]
    at jepsen.db$fn__2137$G__2133__2141.invoke(db.clj:8) 
~[jepsen-0.1.10.jar:na]
    at jepsen.db$fn__2137$G__2132__2146.invoke(db.clj:8) 
~[jepsen-0.1.10.jar:na]
    at clojure.core$partial$fn__5561.invoke(core.clj:2617) 
~[clojure-1.9.0.jar:na]
    at jepsen.control$on_nodes$fn__2116.invoke(control.clj:372) 
~[jepsen-0.1.10.jar:na]
    at clojure.lang.AFn.applyToHelper(AFn.java:154) ~[clojure-1.9.0.jar:na]
    at clojure.lang.AFn.applyTo(AFn.java:144) ~[clojure-1.9.0.jar:na]
    at clojure.core$apply.invokeStatic(core.clj:657) ~[clojure-1.9.0.jar:na]
    at clojure.core$with_bindings_STAR_.invokeStatic(core.clj:1965) 
~[clojure-1.9.0.jar:na]
    at clojure.core$with_bindings_STAR_.doInvoke(core.clj:1965) 
~[clojure-1.9.0.jar:na]
    at clojure.lang.RestFn.applyTo(RestFn.java:142) [clojure-1.9.0.jar:na]
    at clojure.core$apply.invokeStatic(core.clj:661) ~[clojure-1.9.0.jar:na]
    at clojure.core$bound_fn_STAR_$fn__5471.doInvoke(core.clj:1995) 
~[clojure-1.9.0.jar:na]
    at clojure.lang.RestFn.invoke(RestFn.java:408) [clojure-1.9.0.jar:na]
    at jepsen.util$real_pmap$launcher__1168$fn__1169.invoke(util.clj:49) 
~[jepsen-0.1.10.jar:na]
    at clojure.core$binding_conveyor_fn$fn__5476.invoke(core.clj:2022) 
~[clojure-1.9.0.jar:na]
    at clojure.lang.AFn.call(AFn.java:18) ~[clojure-1.9.0.jar:na]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[na:1.8.0_171]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[na:1.8.0_171]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[na:1.8.0_171]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_171]

{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6886: [BP-1.5][FLINK-10614] Let test_batch_allround.sh rely on test-runner-common.sh for clean up

2018-10-19 Thread GitBox
tillrohrmann commented on a change in pull request #6886: [BP-1.5][FLINK-10614] 
Let test_batch_allround.sh rely on test-runner-common.sh for clean up
URL: https://github.com/apache/flink/pull/6886#discussion_r226690899
 
 

 ##
 File path: flink-end-to-end-tests/test-scripts/test_batch_allround.sh
 ##
 @@ -24,31 +24,13 @@ 
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAll
 echo "Run DataSet-Allround-Test Program"
 
 # modify configuration to include spilling to disk
-cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
 echo "taskmanager.network.memory.min: 10485760" >> 
$FLINK_DIR/conf/flink-conf.yaml
 echo "taskmanager.network.memory.max: 10485760" >> 
$FLINK_DIR/conf/flink-conf.yaml
 
 backup_config
 
 Review comment:
   You're right, this command must be moved above the two `echo` lines. The 
config will be reverted by the `test-runner-common.sh`. With FLINK-10094 the 
backup moved to this script as well but it is not part of `release-1.5`. I will 
move this command before the two echos.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-5601) Window operator does not checkpoint watermarks

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-5601:
-
Affects Version/s: 1.7.0

> Window operator does not checkpoint watermarks
> --
>
> Key: FLINK-5601
> URL: https://issues.apache.org/jira/browse/FLINK-5601
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Ufuk Celebi
>Assignee: Jiayi Liao
>Priority: Critical
> Fix For: 1.8.0
>
>
> During release testing [~stefanrichte...@gmail.com] and I noticed that 
> watermarks are not checkpointed in the window operator.
> This can lead to non determinism when restoring checkpoints. I was running an 
> adjusted {{SessionWindowITCase}} via Kafka for testing migration and 
> rescaling and ran into failures, because the data generator required 
> determinisitic behaviour.
> What happened was that on restore it could happen that late elements were not 
> dropped, because the watermarks needed to be re-established after restore 
> first.
> [~aljoscha] Do you know whether there is a special reason for explicitly not 
> checkpointing watermarks?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-5601) Window operator does not checkpoint watermarks

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-5601:
-
Fix Version/s: (was: 1.7.0)
   1.8.0

> Window operator does not checkpoint watermarks
> --
>
> Key: FLINK-5601
> URL: https://issues.apache.org/jira/browse/FLINK-5601
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Ufuk Celebi
>Assignee: Jiayi Liao
>Priority: Critical
> Fix For: 1.8.0
>
>
> During release testing [~stefanrichte...@gmail.com] and I noticed that 
> watermarks are not checkpointed in the window operator.
> This can lead to non determinism when restoring checkpoints. I was running an 
> adjusted {{SessionWindowITCase}} via Kafka for testing migration and 
> rescaling and ran into failures, because the data generator required 
> determinisitic behaviour.
> What happened was that on restore it could happen that late elements were not 
> dropped, because the watermarks needed to be re-established after restore 
> first.
> [~aljoscha] Do you know whether there is a special reason for explicitly not 
> checkpointing watermarks?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6856) Eagerly check if serializer config snapshots are deserializable when snapshotting

2018-10-19 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656930#comment-16656930
 ] 

Till Rohrmann commented on FLINK-6856:
--

What's the state of this issue [~tzulitai]? Is it still valid? If so, shall we 
make it a subtask of FLINK-9376.

> Eagerly check if serializer config snapshots are deserializable when 
> snapshotting
> -
>
> Key: FLINK-6856
> URL: https://issues.apache.org/jira/browse/FLINK-6856
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0, 1.4.2, 1.5.1, 1.6.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.7.0
>
>
> Currently, if serializer config snapshots are not deserializable (for 
> example, if the user did not correctly include the deserialization empty 
> constructor, or the read / write methods are simply wrongly implemented), 
> user's would only be able to find out this when restoring from the snapshot.
> We could eagerly do a check for this when snapshotting, and fail with a good 
> message indicating that the config snapshot can not be deserialized.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-7915) Verify functionality of RollingSinkSecuredITCase

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-7915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7915.
--
   Resolution: Won't Do
Fix Version/s: (was: 1.5.6)
   (was: 1.6.3)
   (was: 1.7.0)

Won't do since {{RollingSink}} is deprecated.

> Verify functionality of RollingSinkSecuredITCase
> 
>
> Key: FLINK-7915
> URL: https://issues.apache.org/jira/browse/FLINK-7915
> Project: Flink
>  Issue Type: Task
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> I recently stumbled across the test {{RollingSinkSecuredITCase}} which will 
> only be executed for Hadoop version {{>= 3}}. When trying to run it from 
> IntelliJ I immediately run into a class not found exception for 
> {{jdbm.helpers.CachePolicy}} and even after fixing this problem, the test 
> would not run because it complained about wrong security settings.
> I think we should check whether this test is at all working and if not, then 
> we should remove or replace it with something working.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8871) Checkpoint cancellation is not propagated to stop checkpointing threads on the task manager

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8871:
-
Fix Version/s: (was: 1.7.0)
   1.8.0

> Checkpoint cancellation is not propagated to stop checkpointing threads on 
> the task manager
> ---
>
> Key: FLINK-8871
> URL: https://issues.apache.org/jira/browse/FLINK-8871
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2, 1.4.1, 1.5.0, 1.6.0, 1.7.0
>Reporter: Stefan Richter
>Priority: Critical
> Fix For: 1.8.0
>
>
> Flink currently lacks any form of feedback mechanism from the job manager / 
> checkpoint coordinator to the tasks when it comes to failing a checkpoint. 
> This means that running snapshots on the tasks are also not stopped even if 
> their owning checkpoint is already cancelled. Two examples for cases where 
> this applies are checkpoint timeouts and local checkpoint failures on a task 
> together with a configuration that does not fail tasks on checkpoint failure. 
> Notice that those running snapshots do no longer account for the maximum 
> number of parallel checkpoints, because their owning checkpoint is considered 
> as cancelled.
> Not stopping the task's snapshot thread can lead to a problematic situation 
> where the next checkpoints already started, while the abandoned checkpoint 
> thread from a previous checkpoint is still lingering around running. This 
> scenario can potentially cascade: many parallel checkpoints will slow down 
> checkpointing and make timeouts even more likely.
>  
> A possible solution is introducing a {{cancelCheckpoint}} method  as 
> counterpart to the {{triggerCheckpoint}} method in the task manager gateway, 
> which is invoked by the checkpoint coordinator as part of cancelling the 
> checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8871) Checkpoint cancellation is not propagated to stop checkpointing threads on the task manager

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8871:
-
Affects Version/s: 1.7.0

> Checkpoint cancellation is not propagated to stop checkpointing threads on 
> the task manager
> ---
>
> Key: FLINK-8871
> URL: https://issues.apache.org/jira/browse/FLINK-8871
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2, 1.4.1, 1.5.0, 1.6.0, 1.7.0
>Reporter: Stefan Richter
>Priority: Critical
> Fix For: 1.8.0
>
>
> Flink currently lacks any form of feedback mechanism from the job manager / 
> checkpoint coordinator to the tasks when it comes to failing a checkpoint. 
> This means that running snapshots on the tasks are also not stopped even if 
> their owning checkpoint is already cancelled. Two examples for cases where 
> this applies are checkpoint timeouts and local checkpoint failures on a task 
> together with a configuration that does not fail tasks on checkpoint failure. 
> Notice that those running snapshots do no longer account for the maximum 
> number of parallel checkpoints, because their owning checkpoint is considered 
> as cancelled.
> Not stopping the task's snapshot thread can lead to a problematic situation 
> where the next checkpoints already started, while the abandoned checkpoint 
> thread from a previous checkpoint is still lingering around running. This 
> scenario can potentially cascade: many parallel checkpoints will slow down 
> checkpointing and make timeouts even more likely.
>  
> A possible solution is introducing a {{cancelCheckpoint}} method  as 
> counterpart to the {{triggerCheckpoint}} method in the task manager gateway, 
> which is invoked by the checkpoint coordinator as part of cancelling the 
> checkpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9002) Add operators with input type that goes through Avro serialization (with schema/generic)

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9002:
-
Fix Version/s: 1.7.0

> Add operators with input type that goes through Avro serialization (with 
> schema/generic) 
> -
>
> Key: FLINK-9002
> URL: https://issues.apache.org/jira/browse/FLINK-9002
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Priority: Major
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8997) Add sliding window aggregation to the job

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8997:
-
Fix Version/s: 1.7.0

> Add sliding window aggregation to the job
> -
>
> Key: FLINK-8997
> URL: https://issues.apache.org/jira/browse/FLINK-8997
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Priority: Major
> Fix For: 1.7.0
>
>
> The test job should also test windowing. Sliding windows are probably the 
> most demanding form, so this would be a good pick for the test.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8998) Add user-defined watermark assigner and timestamp extractor to the test job

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8998:
-
Fix Version/s: 1.7.0

> Add user-defined watermark assigner and timestamp extractor to the test job
> ---
>
> Key: FLINK-8998
> URL: https://issues.apache.org/jira/browse/FLINK-8998
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Priority: Major
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9003) Add operators with input type that goes through custom, stateful serialization

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9003:
-
Fix Version/s: 1.7.0

> Add operators with input type that goes through custom, stateful serialization
> --
>
> Key: FLINK-9003
> URL: https://issues.apache.org/jira/browse/FLINK-9003
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Priority: Major
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9001) Add operators with input type that goes through Kryo serialization (registered/generic/custom)

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9001:
-
Fix Version/s: 1.7.0

> Add operators with input type that goes through Kryo serialization 
> (registered/generic/custom) 
> ---
>
> Key: FLINK-9001
> URL: https://issues.apache.org/jira/browse/FLINK-9001
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Priority: Major
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8995) Add a test operator with keyed state that uses custom, stateful serializer

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-8995:
-
Fix Version/s: 1.7.0

> Add a test operator with keyed state that uses custom, stateful serializer
> --
>
> Key: FLINK-8995
> URL: https://issues.apache.org/jira/browse/FLINK-8995
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Priority: Major
> Fix For: 1.7.0
>
>
> This test should figure out problems in places where multiple threads would 
> share the same serializer instead of properly duplicating it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9410:
-
Affects Version/s: 1.7.0

> Replace NMClient with NMClientAsync in YarnResourceManager
> --
>
> Key: FLINK-9410
> URL: https://issues.apache.org/jira/browse/FLINK-9410
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: zhangminglei
>Priority: Critical
> Fix For: 1.8.0
>
>
> Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} 
> which is called from within the main thread of the {{ResourceManager}}. Since 
> these operations are blocking, we should replace the client with the 
> {{NMClientAsync}} and make the calls non blocking.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10543) Leverage efficient timer deletion in relational operators

2018-10-19 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng reassigned FLINK-10543:
---

Assignee: Hequn Cheng

> Leverage efficient timer deletion in relational operators
> -
>
> Key: FLINK-10543
> URL: https://issues.apache.org/jira/browse/FLINK-10543
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Minor
>
> FLINK-9423 added support for efficient timer deletions. This feature is 
> available since Flink 1.6 and should be used by the relational operator of 
> SQL and Table API.
> Currently, we use a few workarounds to handle situations when deleting timers 
> would be the better solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9410:
-
Fix Version/s: (was: 1.7.0)
   1.8.0

> Replace NMClient with NMClientAsync in YarnResourceManager
> --
>
> Key: FLINK-9410
> URL: https://issues.apache.org/jira/browse/FLINK-9410
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: zhangminglei
>Priority: Critical
> Fix For: 1.8.0
>
>
> Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} 
> which is called from within the main thread of the {{ResourceManager}}. Since 
> these operations are blocking, we should replace the client with the 
> {{NMClientAsync}} and make the calls non blocking.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9587) ContinuousFileMonitoringFunction crashes on short living files

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9587.
--
Resolution: Not A Problem

> ContinuousFileMonitoringFunction crashes on short living files
> --
>
> Key: FLINK-9587
> URL: https://issues.apache.org/jira/browse/FLINK-9587
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Streaming, Streaming Connectors
>Affects Versions: 1.5.0
> Environment: Flink 1.5 running as a standalone cluster.
>Reporter: Andrei Shumanski
>Priority: Critical
> Fix For: 1.6.3, 1.7.0
>
>
> Hi,
>  
> We use Flink to monitor a directory for new files. The filesystem is a MapR 
> Fuse mount that looks like a local FS.
> The files are copied to the directory by another process that uses rsync 
> command. While a file is not completely written rsync creates a temporary 
> file with a name like ".file.txt.uM6MfZ" where the last extension is a random 
> string.
> When the copying is done - file is renamed to the final name "file.txt".
>  
> The bug is that Flink does not correctly handle this behavior and does not 
> take into account that files in the directory might be deleted.
>  
> We are getting error traces:
> {code:java}
> java.io.FileNotFoundException: File 
> file:/mapr/landingarea/cId=2075/.file_00231.cpio.gz.uM6MfZ does not exist or 
> the user running Flink ('root') has insufficient permissions to access it.
> at 
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
> at 
> org.apache.flink.core.fs.local.LocalFileSystem.listStatus(LocalFileSystem.java:177)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:92)
> at 
> org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:707)
> at 
> org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
> at 
> org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
> at 
> org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
> at 
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:591)
> at 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonitoringFunction.java:270)
> at 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:242)
> at 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:206)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> In LocalFileSystem.listStatus(final Path f) we read the list of files in a 
> directory and then create LocalFileStatus object for each of the files. But a 
> file might be removed during the interval between these operations.
> I do not see any option to handle this exception in our code.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9587) ContinuousFileMonitoringFunction crashes on short living files

2018-10-19 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656921#comment-16656921
 ] 

Till Rohrmann commented on FLINK-9587:
--

I think we can close this issue due to inactivity.

> ContinuousFileMonitoringFunction crashes on short living files
> --
>
> Key: FLINK-9587
> URL: https://issues.apache.org/jira/browse/FLINK-9587
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Streaming, Streaming Connectors
>Affects Versions: 1.5.0
> Environment: Flink 1.5 running as a standalone cluster.
>Reporter: Andrei Shumanski
>Priority: Critical
> Fix For: 1.6.3, 1.7.0
>
>
> Hi,
>  
> We use Flink to monitor a directory for new files. The filesystem is a MapR 
> Fuse mount that looks like a local FS.
> The files are copied to the directory by another process that uses rsync 
> command. While a file is not completely written rsync creates a temporary 
> file with a name like ".file.txt.uM6MfZ" where the last extension is a random 
> string.
> When the copying is done - file is renamed to the final name "file.txt".
>  
> The bug is that Flink does not correctly handle this behavior and does not 
> take into account that files in the directory might be deleted.
>  
> We are getting error traces:
> {code:java}
> java.io.FileNotFoundException: File 
> file:/mapr/landingarea/cId=2075/.file_00231.cpio.gz.uM6MfZ does not exist or 
> the user running Flink ('root') has insufficient permissions to access it.
> at 
> org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115)
> at 
> org.apache.flink.core.fs.local.LocalFileSystem.listStatus(LocalFileSystem.java:177)
> at 
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:92)
> at 
> org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:707)
> at 
> org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
> at 
> org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
> at 
> org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710)
> at 
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:591)
> at 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonitoringFunction.java:270)
> at 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:242)
> at 
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:206)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> In LocalFileSystem.listStatus(final Path f) we read the list of files in a 
> directory and then create LocalFileStatus object for each of the files. But a 
> file might be removed during the interval between these operations.
> I do not see any option to handle this exception in our code.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10368) 'Kerberized YARN on Docker test' instable

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-10368:
--
Fix Version/s: 1.6.3
   1.5.6

> 'Kerberized YARN on Docker test' instable
> -
>
> Key: FLINK-10368
> URL: https://issues.apache.org/jira/browse/FLINK-10368
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> Running Kerberized YARN on Docker test end-to-end test failed on an AWS 
> instance. The problem seems to be that the NameNode went into safe-mode due 
> to limited resources.
> {code}
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/hadoop-user/flink-1.6.1/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/local/hadoop-2.8.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-09-19 09:04:39,201 INFO  org.apache.hadoop.security.UserGroupInformation 
>   - Login successful for user hadoop-user using keytab file 
> /home/hadoop-user/hadoop-user.keytab
> 2018-09-19 09:04:39,453 INFO  org.apache.hadoop.yarn.client.RMProxy   
>   - Connecting to ResourceManager at 
> master.docker-hadoop-cluster-network/172.22.0.3:8032
> 2018-09-19 09:04:39,640 INFO  org.apache.hadoop.yarn.client.AHSProxy  
>   - Connecting to Application History server at 
> master.docker-hadoop-cluster-network/172.22.0.3:10200
> 2018-09-19 09:04:39,656 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-09-19 09:04:39,656 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-09-19 09:04:39,901 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cluster 
> specification: ClusterSpecification{masterMemoryMB=2000, 
> taskManagerMemoryMB=2000, numberTaskManagers=3, slotsPerTaskManager=1}
> 2018-09-19 09:04:40,286 WARN  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The 
> configuration directory ('/home/hadoop-user/flink-1.6.1/conf') contains both 
> LOG4J and Logback configuration files. Please delete or rename one of them.
> 
>  The program finished with the following exception:
> org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't 
> deploy Yarn session cluster
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420)
> at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:259)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot 
> create 
> file/user/hadoop-user/.flink/application_1537266361291_0099/lib/slf4j-log4j12-1.7.7.jar.
>  Name node is in safe mode.
> Resources are low on NN. Please add or free up more resources then turn off 
> safe mode manually. NOTE:  If you turn off safe mode before adding resources, 
> the NN will immediately return to safe mode. Use "hdfs dfsadmin -safemode 
> leave" to turn safe mode off. 
> NamenodeHostName:master.docker-hadoop-cluster-network
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.newSafemodeException(FSNamesystem.java:1407)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkNameNodeSafeMode(FSNamesystem.java:1395)
> at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2278)
> at 
> org.apache.hado

[jira] [Commented] (FLINK-10364) Test instability in NonHAQueryableStateFsBackendITCase#testMapState

2018-10-19 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656917#comment-16656917
 ] 

Till Rohrmann commented on FLINK-10364:
---

Ping [~kkl0u]

> Test instability in NonHAQueryableStateFsBackendITCase#testMapState
> ---
>
> Key: FLINK-10364
> URL: https://issues.apache.org/jira/browse/FLINK-10364
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.5.4, 1.6.0
>Reporter: Nico Kruber
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> {code}
> Tests run: 12, Failures: 0, Errors: 1, Skipped: 1, Time elapsed: 14.365 sec 
> <<< FAILURE! - in 
> org.apache.flink.queryablestate.itcases.NonHAQueryableStateFsBackendITCase
> testMapState(org.apache.flink.queryablestate.itcases.NonHAQueryableStateFsBackendITCase)
>   Time elapsed: 0.253 sec  <<< ERROR!
> java.lang.NullPointerException: null
>   at 
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.testMapState(AbstractQueryableStateTestBase.java:840)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> {code}
> from https://api.travis-ci.org/v3/job/429797943/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10491) Deadlock during spilling data in SpillableSubpartition

2018-10-19 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656916#comment-16656916
 ] 

Till Rohrmann commented on FLINK-10491:
---

[~pnowojski] [~zjwang], is this something we want to change for Flink 1.7?

> Deadlock during spilling data in SpillableSubpartition 
> ---
>
> Key: FLINK-10491
> URL: https://issues.apache.org/jira/browse/FLINK-10491
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.4, 1.6.1
>Reporter: Piotr Nowojski
>Assignee: zhijiang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> Originally reported here: 
> [https://lists.apache.org/thread.html/472c8f4a2711c5e217fadd9a88f8c73670218e7432bb81ba3f5076db@%3Cuser.flink.apache.org%3E]
> Thread dump (from 1.5.3 version) showing two deadlocked threads, because they 
> are taking two locks in different order:
> {noformat}
> Thread-1
> "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA 
> waiting for monitor entry
> waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a 
> java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
> - locked <0x2dfd> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
> - locked <0x2da5> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
> at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
> at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:745)
> Thread-2
> "Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor 
> entry
> java.lang.Thread.State: BLOCKED
> blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
> waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release 
> lock on <0x2dfd> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)
> at 
> org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)
> at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)
> at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)
> - locked <0x2dfb> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)
> - locked <0x2dfc> (a 
> org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNew

[jira] [Updated] (FLINK-10481) Wordcount end-to-end test in docker env unstable

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-10481:
--
Fix Version/s: 1.5.6

> Wordcount end-to-end test in docker env unstable
> 
>
> Key: FLINK-10481
> URL: https://issues.apache.org/jira/browse/FLINK-10481
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> The {{Wordcount end-to-end test in docker env}} fails sometimes on Travis 
> with the following problem:
> {code}
> Status: Downloaded newer image for java:8-jre-alpine
>  ---> fdc893b19a14
> Step 2/16 : RUN apk add --no-cache bash snappy
>  ---> [Warning] IPv4 forwarding is disabled. Networking will not work.
>  ---> Running in 4329ebcd8a77
> fetch http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz
> WARNING: Ignoring 
> http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz: 
> temporary error (try again later)
> fetch 
> http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz
> WARNING: Ignoring 
> http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz: 
> temporary error (try again later)
> ERROR: unsatisfiable constraints:
>   bash (missing):
> required by: world[bash]
>   snappy (missing):
> required by: world[snappy]
> The command '/bin/sh -c apk add --no-cache bash snappy' returned a non-zero 
> code: 2
> {code}
> https://api.travis-ci.org/v3/job/434909395/log.txt
> It seems as if it is related to 
> https://github.com/gliderlabs/docker-alpine/issues/264 and 
> https://github.com/gliderlabs/docker-alpine/issues/279.
> We might want to switch to a different base image to avoid these problems in 
> the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10481) Wordcount end-to-end test in docker env unstable

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-10481:
--
Fix Version/s: 1.6.3

> Wordcount end-to-end test in docker env unstable
> 
>
> Key: FLINK-10481
> URL: https://issues.apache.org/jira/browse/FLINK-10481
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> The {{Wordcount end-to-end test in docker env}} fails sometimes on Travis 
> with the following problem:
> {code}
> Status: Downloaded newer image for java:8-jre-alpine
>  ---> fdc893b19a14
> Step 2/16 : RUN apk add --no-cache bash snappy
>  ---> [Warning] IPv4 forwarding is disabled. Networking will not work.
>  ---> Running in 4329ebcd8a77
> fetch http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz
> WARNING: Ignoring 
> http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz: 
> temporary error (try again later)
> fetch 
> http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz
> WARNING: Ignoring 
> http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz: 
> temporary error (try again later)
> ERROR: unsatisfiable constraints:
>   bash (missing):
> required by: world[bash]
>   snappy (missing):
> required by: world[snappy]
> The command '/bin/sh -c apk add --no-cache bash snappy' returned a non-zero 
> code: 2
> {code}
> https://api.travis-ci.org/v3/job/434909395/log.txt
> It seems as if it is related to 
> https://github.com/gliderlabs/docker-alpine/issues/264 and 
> https://github.com/gliderlabs/docker-alpine/issues/279.
> We might want to switch to a different base image to avoid these problems in 
> the future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10368) 'Kerberized YARN on Docker test' instable

2018-10-19 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656901#comment-16656901
 ] 

Till Rohrmann commented on FLINK-10368:
---

I encountered another problem:
{code}The program finished with the following exception:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy 
Yarn session cluster
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:419)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:261)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1035)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: The 
number of requested virtual cores per node 1 exceeds the maximum number of 
virtual cores 0 available in the Yarn Cluster. Please note that the number of 
virtual cores is set to the number of task slots by default unless configured 
in the Flink config with 'yarn.containers.vcores.'
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:299)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:491)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:412)
... 9 more
{code}
Apparently, the cluster thinks that it has only {{0}} vcores. This might be a 
setup issue.

> 'Kerberized YARN on Docker test' instable
> -
>
> Key: FLINK-10368
> URL: https://issues.apache.org/jira/browse/FLINK-10368
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.7.0
>
>
> Running Kerberized YARN on Docker test end-to-end test failed on an AWS 
> instance. The problem seems to be that the NameNode went into safe-mode due 
> to limited resources.
> {code}
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/hadoop-user/flink-1.6.1/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/local/hadoop-2.8.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 2018-09-19 09:04:39,201 INFO  org.apache.hadoop.security.UserGroupInformation 
>   - Login successful for user hadoop-user using keytab file 
> /home/hadoop-user/hadoop-user.keytab
> 2018-09-19 09:04:39,453 INFO  org.apache.hadoop.yarn.client.RMProxy   
>   - Connecting to ResourceManager at 
> master.docker-hadoop-cluster-network/172.22.0.3:8032
> 2018-09-19 09:04:39,640 INFO  org.apache.hadoop.yarn.client.AHSProxy  
>   - Connecting to Application History server at 
> master.docker-hadoop-cluster-network/172.22.0.3:10200
> 2018-09-19 09:04:39,656 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-09-19 09:04:39,656 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - No path for the flink jar passed. Using the location of class 
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2018-09-19 09:04:39,901 INFO  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Cluster 
> specification: ClusterSpecification{masterMemoryMB=2000, 
> taskManagerMemoryMB=2000, numberTaskManagers=3, slotsPerTaskManager=1}
> 2018-09-19 09:04:40,286 WARN  
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - The 
> configuration directory ('/home/hadoop-user/flink-1.6.1/conf') contains both 
> LOG4J and Logback configuration files. Please delete or rename one of them.
> 
>  The program finished with the following exception:
> o

[jira] [Commented] (FLINK-10482) java.lang.IllegalArgumentException: Negative number of in progress checkpoints

2018-10-19 Thread Gary Yao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656892#comment-16656892
 ] 

Gary Yao commented on FLINK-10482:
--

While working on FLINK-10309, I discovered FLINK-10615, which can also cause 
the issue described here. If I recall correctly, the reason is that a pending 
checkpoint may be [aborted multiple 
times|https://github.com/apache/flink/blob/d2480af40f5145d865196a95ec58a415482d8cff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L512],
 which messes up the statistics.

> java.lang.IllegalArgumentException: Negative number of in progress checkpoints
> --
>
> Key: FLINK-10482
> URL: https://issues.apache.org/jira/browse/FLINK-10482
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.1
>Reporter: Julio Biason
>Priority: Major
> Fix For: 1.8.0
>
>
> Recently I found the following log on my JobManager log:
> {noformat}
> 2018-10-02 17:44:50,090 [flink-akka.actor.default-dispatcher-4117] ERROR 
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  - Implementation 
> error: Unhandled exception.
>  java.lang.IllegalArgumentException: Negative number of in progress 
> checkpoints
>      at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
>      at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.(CheckpointStatsCounts.java:72)
>      at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177)
>      at 
> org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166)
>      at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553)
>      at 
> org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340)
>      at 
> org.apache.flink.runtime.jobmaster.JobMaster.requestJob(JobMaster.java:923)
>      at sun.reflect.GeneratedMethodAccessor101.invoke(Unknown Source) 
>   
>      at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>      at java.lang.reflect.Method.invoke(Method.java:498)  
>    
>      at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
>   
>     
>      at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
>      at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
>      at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>   
>   
>      at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>   
>    
>      at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>   
>    
>      at akka.actor.Actor$class.aroundReceive(Actor.scala:502) 
>   
>    
>      at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)  
>   
>    
>      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)  
>      
>      at akka.actor.ActorCell.invoke(ActorCell.scala:495)     
>      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)   
>   
>    
>      at akka.dispatch.Mailbox.run(Mailbox.scala:224)    
>      at akka.dispatch.Mailbox.exec(Mailbox.scala:234) 
>   
>    
>      at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)  
>   
>    
>      at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   

[jira] [Commented] (FLINK-10601) Make user home dir consistent with Flink default filesystem

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656883#comment-16656883
 ] 

ASF GitHub Bot commented on FLINK-10601:


link3280 commented on issue #6887: [FLINK-10601] [YARN] Make user home dir 
consistent with Flink default filesystem
URL: https://github.com/apache/flink/pull/6887#issuecomment-431389044
 
 
   I'd like to add some tests, but it seems that it's hard to expose the state 
of YARN containers in a UT, and writing an IT for this should be unworthy. I 
might need some suggestions on this. Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make user home dir consistent with Flink default filesystem
> ---
>
> Key: FLINK-10601
> URL: https://issues.apache.org/jira/browse/FLINK-10601
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.6.1
>Reporter: Paul Lin
>Priority: Major
>  Labels: pull-request-available
>
> Currently, Flink client uses the raw default Hadoop filesystem to upload 
> local files, and this could be problematic when using a non-default 
> filesystem for HA or checkpointing in delegation tokens scenarios. The 
> jobmanager only has the delegation tokens for the default FS, so it gets 
> authentication errors when trying to connect a non-default filesystem.
> So I propose to replace the default FS property in yarn configuration with 
> the Flink filesystem property `fs.default-scheme` on the client side 
> (AbstractYarnClusterDescriptor), to avoid this problem and also make the 
> client behavior more configurable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] link3280 commented on issue #6887: [FLINK-10601] [YARN] Make user home dir consistent with Flink default filesystem

2018-10-19 Thread GitBox
link3280 commented on issue #6887: [FLINK-10601] [YARN] Make user home dir 
consistent with Flink default filesystem
URL: https://github.com/apache/flink/pull/6887#issuecomment-431389044
 
 
   I'd like to add some tests, but it seems that it's hard to expose the state 
of YARN containers in a UT, and writing an IT for this should be unworthy. I 
might need some suggestions on this. Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10601) Make user home dir consistent with Flink default filesystem

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656881#comment-16656881
 ] 

ASF GitHub Bot commented on FLINK-10601:


link3280 opened a new pull request #6887: [FLINK-10601] [YARN] Make user home 
dir consistent with Flink default filesystem
URL: https://github.com/apache/flink/pull/6887
 
 
   ## What is the purpose of the change
   
   Currently, Flink client uses the raw default Hadoop filesystem to upload 
local files, and this could be problematic when using a non-default filesystem 
for HA or checkpointing in delegation tokens scenarios. The jobmanager only has 
the delegation tokens for the default FS, so it gets authentication errors when 
trying to connect a non-default filesystem.
   
   So I propose to make the default FS consistent with the Flink filesystem 
property fs.default-scheme (if it's set) on the client side 
(AbstractYarnClusterDescriptor), to avoid this problem and also make the client 
behavior more configurable.
   
   ## Brief change log
   
   - Replace the fs.defaultFS property in the yarn configuration with the 
initiated Flink filesystem's default scheme if it's not local.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Set the default fs in core.xml to a non-exist fs, and set the 
fs.default-scheme property to a valid one in flink-conf.yaml, then run the 
WordCount example to see if it works well. There would be errors while the 
client tries to upload the files to the remote system if things go wrong.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make user home dir consistent with Flink default filesystem
> ---
>
> Key: FLINK-10601
> URL: https://issues.apache.org/jira/browse/FLINK-10601
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.6.1
>Reporter: Paul Lin
>Priority: Major
>  Labels: pull-request-available
>
> Currently, Flink client uses the raw default Hadoop filesystem to upload 
> local files, and this could be problematic when using a non-default 
> filesystem for HA or checkpointing in delegation tokens scenarios. The 
> jobmanager only has the delegation tokens for the default FS, so it gets 
> authentication errors when trying to connect a non-default filesystem.
> So I propose to replace the default FS property in yarn configuration with 
> the Flink filesystem property `fs.default-scheme` on the client side 
> (AbstractYarnClusterDescriptor), to avoid this problem and also make the 
> client behavior more configurable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] link3280 opened a new pull request #6887: [FLINK-10601] [YARN] Make user home dir consistent with Flink default filesystem

2018-10-19 Thread GitBox
link3280 opened a new pull request #6887: [FLINK-10601] [YARN] Make user home 
dir consistent with Flink default filesystem
URL: https://github.com/apache/flink/pull/6887
 
 
   ## What is the purpose of the change
   
   Currently, Flink client uses the raw default Hadoop filesystem to upload 
local files, and this could be problematic when using a non-default filesystem 
for HA or checkpointing in delegation tokens scenarios. The jobmanager only has 
the delegation tokens for the default FS, so it gets authentication errors when 
trying to connect a non-default filesystem.
   
   So I propose to make the default FS consistent with the Flink filesystem 
property fs.default-scheme (if it's set) on the client side 
(AbstractYarnClusterDescriptor), to avoid this problem and also make the client 
behavior more configurable.
   
   ## Brief change log
   
   - Replace the fs.defaultFS property in the yarn configuration with the 
initiated Flink filesystem's default scheme if it's not local.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Set the default fs in core.xml to a non-exist fs, and set the 
fs.default-scheme property to a valid one in flink-conf.yaml, then run the 
WordCount example to see if it works well. There would be errors while the 
client tries to upload the files to the remote system if things go wrong.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-10275) StreamTask support object reuse

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-10275.
---
   Resolution: Won't Fix
Fix Version/s: (was: 1.7.0)

> StreamTask support object reuse
> ---
>
> Key: FLINK-10275
> URL: https://issues.apache.org/jira/browse/FLINK-10275
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Priority: Major
>  Labels: pull-request-available
>
> StreamTask support efficient object reuse. The purpose behind this is to 
> reduce pressure on the garbage collector.
> All objects are reused, without backup copies. The operators and UDFs must be 
> careful to not keep any objects as state or not to modify the objects.
> *FYI, now this thread is blocked by FLIP-21. The pull request attached is 
> closed for a needed rework.*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-10275) StreamTask support object reuse

2018-10-19 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reopened FLINK-10275:
---

> StreamTask support object reuse
> ---
>
> Key: FLINK-10275
> URL: https://issues.apache.org/jira/browse/FLINK-10275
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Priority: Major
>  Labels: pull-request-available
>
> StreamTask support efficient object reuse. The purpose behind this is to 
> reduce pressure on the garbage collector.
> All objects are reused, without backup copies. The operators and UDFs must be 
> careful to not keep any objects as state or not to modify the objects.
> *FYI, now this thread is blocked by FLIP-21. The pull request attached is 
> closed for a needed rework.*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10605) Upgrade AvroSerializer snapshot to implement new TypeSerializerSnapshot interface

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656874#comment-16656874
 ] 

ASF GitHub Bot commented on FLINK-10605:


igalshilman commented on a change in pull request #6881: [FLINK-10605] [core] 
Upgrade AvroSerializer snapshot to implement new TypeSerializerSnapshot 
interface
URL: https://github.com/apache/flink/pull/6881#discussion_r226672698
 
 

 ##
 File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An {@code Avro} specific implementation of a {@link TypeSerializerSnapshot}.
+ *
+ * @param  The data type that the originating serializer of this 
configuration serializes.
+ */
+public final class AvroSerializerSnapshot implements 
TypeSerializerSnapshot {
+   private Class runtimeType;
+   private Schema schema;
+
+   @SuppressWarnings("WeakerAccess")
+   public AvroSerializerSnapshot() {
+   // this constructor is used when restoring from a checkpoint.
+   }
+
+   AvroSerializerSnapshot(Schema schema, Class runtimeType) {
+   this.schema = schema;
+   this.runtimeType = runtimeType;
+   }
+
+   @Override
+   public int getCurrentVersion() {
+   return 1;
+   }
+
+   @Override
+   public void write(DataOutputView out) throws IOException {
+   checkNotNull(runtimeType);
+   checkNotNull(schema);
+
+   out.writeUTF(runtimeType.getName());
+   out.writeUTF(schema.toString(false));
+   }
+
+   @Override
+   public void read(int readVersion, DataInputView in, ClassLoader 
userCodeClassLoader) throws IOException {
+   final String previousRuntimeClassName = in.readUTF();
+   final String previousSchemaDefinition = in.readUTF();
+
+   this.runtimeType = findClassOrThrow(userCodeClassLoader, 
previousRuntimeClassName);
+   this.schema = parseAvroSchema(previousSchemaDefinition);
+   }
+
+   @Override
+   public > 
TypeSerializerSchemaCompatibility
+   resolveSchemaCompatibility(NS newSerializer) {
+   if (!(newSerializer instanceof AvroSerializer)) {
+   return TypeSerializerSchemaCompatibility.incompatible();
+   }
+   AvroSerializer newAvroSerializer = (AvroSerializer) 
newSerializer;
+   return resolveSchemaCompatibility(schema, 
newAvroSerializer.getAvroSchema());
+   }
+
+   @Override
+   public TypeSerializer restoreSerializer() {
+   checkNotNull(runtimeType);
+   checkNotNull(schema);
+
+   if (AvroSerializer.isGenericRecord(runtimeType)) {
+   return new AvroSerializer<>(runtimeType, schema);
+   }
+   else {
+   return new AvroSerializer<>(runtimeType);
+   }
+   }
+
+   // 

+   // Helpers
+   // 

+
+   /**
+* Resolves writer/reader schema compatibly.
+*
+* Checks whenever a new version of 

  1   2   3   4   >