[jira] [Assigned] (FLINK-10463) Null literal cannot be properly parsed in Java Table API function call
[ https://issues.apache.org/jira/browse/FLINK-10463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng reassigned FLINK-10463: --- Assignee: Hequn Cheng > Null literal cannot be properly parsed in Java Table API function call > -- > > Key: FLINK-10463 > URL: https://issues.apache.org/jira/browse/FLINK-10463 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Xingcan Cui >Assignee: Hequn Cheng >Priority: Major > > For example, the expression `Null(STRING).regexpReplace('oo|ar', '')` throws > the following exception. > {code:java} > org.apache.flink.table.api.ExpressionParserException: Could not parse > expression at column 13: string matching regex > `(?i)\Qas\E(?![_$\p{javaJavaIdentifierPart}])' expected but `.' found > Null(STRING).regexpReplace('oo|ar', '') > ^ > at > org.apache.flink.table.expressions.ExpressionParser$.throwError(ExpressionParser.scala:576) > at > org.apache.flink.table.expressions.ExpressionParser$.parseExpression(ExpressionParser.scala:569) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10205) Batch Job: InputSplit Fault tolerant for DataSourceTask
[ https://issues.apache.org/jira/browse/FLINK-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657704#comment-16657704 ] ASF GitHub Bot commented on FLINK-10205: TisonKun commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#issuecomment-431546174 FYI there was another [discussion(FLINK-10038)](https://issues.apache.org/jira/browse/FLINK-10038) on `InputSplit` (creation and) assignment. I prefer the idea to move the whole `InputSplit` assignment into a single task. It could, get splits auto checkpointed and simplify the currently complex JM. However, new nodes might introduce unexpected feature to the graph, and besides we have to implement some "pull" semantic for splits nodes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Batch Job: InputSplit Fault tolerant for DataSourceTask > --- > > Key: FLINK-10205 > URL: https://issues.apache.org/jira/browse/FLINK-10205 > Project: Flink > Issue Type: Sub-task > Components: JobManager >Affects Versions: 1.6.1, 1.6.2, 1.7.0 >Reporter: JIN SUN >Assignee: JIN SUN >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > Today DataSource Task pull InputSplits from JobManager to achieve better > performance, however, when a DataSourceTask failed and rerun, it will not get > the same splits as its previous version. this will introduce inconsistent > result or even data corruption. > Furthermore, if there are two executions run at the same time (in batch > scenario), this two executions should process same splits. > we need to fix the issue to make the inputs of a DataSourceTask > deterministic. The propose is save all splits into ExecutionVertex and > DataSourceTask will pull split from there. > document: > [https://docs.google.com/document/d/1FdZdcA63tPUEewcCimTFy9Iz2jlVlMRANZkO4RngIuk/edit?usp=sharing] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource…
TisonKun commented on issue #6684: [FLINK-10205] Batch Job: InputSplit Fault tolerant for DataSource… URL: https://github.com/apache/flink/pull/6684#issuecomment-431546174 FYI there was another [discussion(FLINK-10038)](https://issues.apache.org/jira/browse/FLINK-10038) on `InputSplit` (creation and) assignment. I prefer the idea to move the whole `InputSplit` assignment into a single task. It could, get splits auto checkpointed and simplify the currently complex JM. However, new nodes might introduce unexpected feature to the graph, and besides we have to implement some "pull" semantic for splits nodes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10558) Port YARNHighAvailabilityITCase and YARNSessionCapacitySchedulerITCase to new code base
[ https://issues.apache.org/jira/browse/FLINK-10558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657694#comment-16657694 ] TisonKun commented on FLINK-10558: -- [~till.rohrmann] I'd like to know if the present status excepted. As [~yanghua] commented on GitHub. bq. @tillrohrmann For test case testTaskManagerFailure, the current test mode (based on stdout and stderr's special print message) cannot verify the Flip-6 scene. Because it doesn't pre-start a TM, so I can only commit a job to drive the TM registration, but this will cause the Job's output to flush out the flink's output, which shares stdout and stderr. Therefore, many judgments will not take effect. I wonder if on FLIP-6 yarn session mode, we'd like to (really) pre-start TMs, if so, we have to mark that the workaround(submit a job to drive th TM registration) is temporary. > Port YARNHighAvailabilityITCase and YARNSessionCapacitySchedulerITCase to new > code base > --- > > Key: FLINK-10558 > URL: https://issues.apache.org/jira/browse/FLINK-10558 > Project: Flink > Issue Type: Sub-task >Reporter: vinoyang >Assignee: TisonKun >Priority: Minor > > {{YARNHighAvailabilityITCase}}, > {{YARNSessionCapacitySchedulerITCase#testClientStartup,}} > {{YARNSessionCapacitySchedulerITCase#testTaskManagerFailure}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10461) Speed up download file procedure when restore
[ https://issues.apache.org/jira/browse/FLINK-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657653#comment-16657653 ] ASF GitHub Bot commented on FLINK-10461: klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-431539541 Hi, @azagrebin , could you please help to review this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Speed up download file procedure when restore > -- > > Key: FLINK-10461 > URL: https://issues.apache.org/jira/browse/FLINK-10461 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > Labels: pull-request-available > > In the current master branch, the restore will download file from DFS, the > download procedure are single-thread, this could speed up by using > multi-thread for downloading states from DFS. > > In my company, the states will come to some terabytes, so the restore > procedure will become a litter slow, after a bit digging, I find download > states from DFS using single thread, this could using multi-thread for speed > up. > I test the time used for download states from DFS with ~2 terabytes states. > With single thread it used 640+s, and 130+s when using 5 threads for download. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-431539541 Hi, @azagrebin , could you please help to review this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase
yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#issuecomment-431537500 failed case : testFlinkKafkaProducer10FailTransactionCoordinatorBeforeNotify, log file : https://api.travis-ci.org/v3/job/443777257/log.txt This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase
[ https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657639#comment-16657639 ] ASF GitHub Bot commented on FLINK-10527: yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#issuecomment-431537500 failed case : testFlinkKafkaProducer10FailTransactionCoordinatorBeforeNotify, log file : https://api.travis-ci.org/v3/job/443777257/log.txt This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup constant isNewMode in YarnTestBase > -- > > Key: FLINK-10527 > URL: https://issues.apache.org/jira/browse/FLINK-10527 > Project: Flink > Issue Type: Sub-task > Components: YARN >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > This seems to be a residual problem with FLINK-10396. It is set to true in > that PR. Currently it has three usage scenarios: > 1. assert, caused an error > {code:java} > assumeTrue("The new mode does not start TMs upfront.", !isNewMode); > {code} > 2. if (!isNewMode) the logic in the block would not have invoked, the if > block can be removed > 3. if (isNewMode) always been invoked, the if statement can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10619) config MemoryStateBackend default value
[ https://issues.apache.org/jira/browse/FLINK-10619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657627#comment-16657627 ] yangxiaoshuo commented on FLINK-10619: -- If it is allowed, I would linke to fix it. > config MemoryStateBackend default value > --- > > Key: FLINK-10619 > URL: https://issues.apache.org/jira/browse/FLINK-10619 > Project: Flink > Issue Type: Improvement > Components: Configuration >Reporter: yangxiaoshuo >Assignee: yangxiaoshuo >Priority: Minor > Labels: starter > > The default MAX_STATE_SIZE of MemoryStateBackend is 5 * 1024 * 1024. > If we want to change it, the only way is > {code:java} > // code placeholder > env.setStateBackend(new MemoryStateBackend(1024 * 1024 * 1024));{code} > Is it? > So shall we add it into configuration file? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10619) config MemoryStateBackend default value
yangxiaoshuo created FLINK-10619: Summary: config MemoryStateBackend default value Key: FLINK-10619 URL: https://issues.apache.org/jira/browse/FLINK-10619 Project: Flink Issue Type: Improvement Components: Configuration Reporter: yangxiaoshuo Assignee: yangxiaoshuo The default MAX_STATE_SIZE of MemoryStateBackend is 5 * 1024 * 1024. If we want to change it, the only way is {code:java} // code placeholder env.setStateBackend(new MemoryStateBackend(1024 * 1024 * 1024));{code} Is it? So shall we add it into configuration file? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10618) Introduce catalog for Flink tables
Xuefu Zhang created FLINK-10618: --- Summary: Introduce catalog for Flink tables Key: FLINK-10618 URL: https://issues.apache.org/jira/browse/FLINK-10618 Project: Flink Issue Type: New Feature Components: SQL Client Affects Versions: 1.6.1 Reporter: Xuefu Zhang Assignee: Xuefu Zhang Besides meta objects such as tables that may come from an {{ExternalCatalog}}, Flink also deals with tables/views/functions that are created on the fly (in memory), or specified in a configuration file. Those objects don't belong to any {{ExternalCatalog}}, yet Flink either stores them in memory, which are non-persistent, or recreates them from a file, which is a big pain for the user. Those objects are only known to Flink but Flink has a poor management for them. Since they are typical objects in a database catalog, it's natural to have a catalog that manages those objects. The interface will be similar to {{ExternalCatalog}}, which contains meta objects that are not managed by Flink. There are several possible implementations of the Flink internal catalog interface: memory, file, external registry (such as confluent schema registry or Hive metastore), and relational database, etc. The initial functionality as well as the catalog hierarchy could be very simple. The basic functionality of the catalog will be mostly create, alter, and drop tables, views, functions, etc. Obviously, this can evolve over the time. We plan to provide implementations with memory, file, and Hive metastore, and will be plugged in at SQL-Client layer. Please provide your feedback. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10617) Restoring job fails because of slot allocation timeout
Elias Levy created FLINK-10617: -- Summary: Restoring job fails because of slot allocation timeout Key: FLINK-10617 URL: https://issues.apache.org/jira/browse/FLINK-10617 Project: Flink Issue Type: Bug Components: ResourceManager, TaskManager Affects Versions: 1.6.1 Reporter: Elias Levy The following may be related to FLINK-9932, but I am unsure. If you believe it is, go ahead and close this issue and a duplicate. While trying to test local state recovery on a job with large state, the job failed to be restored because slot allocation timed out. The job is running on a standalone cluster with 12 nodes and 96 task slots (8 per node). The job has parallelism of 96, so it consumes all of the slots, and has ~200 GB of state in RocksDB. To test local state recovery I decided to kill one of the TMs. The TM immediately restarted and re-registered with the JM. I confirmed the JM showed 96 registered task slots. {noformat} 21:35:44,616 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Resolved ResourceManager address, beginning registration 21:35:44,616 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Registration at ResourceManager attempt 1 (timeout=100ms) 21:35:44,640 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Successful registration at resource manager akka.tcp://flink@172.31.18.172:6123/user/resourcemanager under registration id 302988dea6afbd613bb2f96429b65d18. 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Receive slot request AllocationID{4274d96a59d370305520876f5b84fb9f} for job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 8e06aa64d5f8961809da38fe7f224cc1. 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for AllocationID{4274d96a59d370305520876f5b84fb9f}. 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring. 21:36:49,668 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. 21:36:49,671 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Try to register at job manager akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id f85f6f9b-7713-4be3-a8f0-8443d91e5e6d. 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Receive slot request AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9} for job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 8e06aa64d5f8961809da38fe7f224cc1. 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9}. 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring. 21:36:49,681 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. 21:36:49,681 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. 21:36:49,683 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Try to register at job manager akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id f85f6f9b-7713-4be3-a8f0-8443d91e5e6d. 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Resolved JobManager address, beginning registration 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Resolved JobManager address, beginning registration 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Receive slot request AllocationID{740caf20a5f7f767864122dc9a7444d9} for job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 8e06aa64d5f8961809da38fe7f224cc1. 21:36:49,688 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Registration at JobManager attempt 1 (timeout=100ms) 21:36:49,688 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for AllocationID{740caf20a5f7f767864122dc9a7444d9}. 21:36:49,688 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring. 21:36:49,688 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. 21:36:49,688 INFO org.apache.flink.runtim
[jira] [Commented] (FLINK-9761) Potential buffer leak in PartitionRequestClientHandler during job failures
[ https://issues.apache.org/jira/browse/FLINK-9761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657306#comment-16657306 ] Till Rohrmann commented on FLINK-9761: -- [~NicoK] could you verify whether this is really a problem? > Potential buffer leak in PartitionRequestClientHandler during job failures > -- > > Key: FLINK-9761 > URL: https://issues.apache.org/jira/browse/FLINK-9761 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Critical > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > {{PartitionRequestClientHandler#stagedMessages}} may be accessed from > multiple threads: > 1) Netty's IO thread > 2) During cancellation, > {{PartitionRequestClientHandler.BufferListenerTask#notifyBufferDestroyed}} is > called > If {{PartitionRequestClientHandler.BufferListenerTask#notifyBufferDestroyed}} > thinks, {{stagesMessages}} is empty, however, it will not install the > {{stagedMessagesHandler}} that consumes and releases buffers from received > messages. > Unless some unexpected combination of code calls prevents this from > happening, this would leak the non-recycled buffers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8418) Kafka08ITCase.testStartFromLatestOffsets() times out on Travis
[ https://issues.apache.org/jira/browse/FLINK-8418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657305#comment-16657305 ] Till Rohrmann commented on FLINK-8418: -- Do you think that we can close this issue because it did not reappear again [~tzulitai]? > Kafka08ITCase.testStartFromLatestOffsets() times out on Travis > -- > > Key: FLINK-8418 > URL: https://issues.apache.org/jira/browse/FLINK-8418 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Instance: https://travis-ci.org/kl0u/flink/builds/327733085 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8073) Test instability FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()
[ https://issues.apache.org/jira/browse/FLINK-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-8073. -- Resolution: Cannot Reproduce Fix Version/s: (was: 1.5.6) (was: 1.6.3) (was: 1.7.0) Problem did not reappear for quite some time. Closing. If this issue reappears, then we should reopen this issue. > Test instability > FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint() > - > > Key: FLINK-8073 > URL: https://issues.apache.org/jira/browse/FLINK-8073 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.4.0, 1.5.0 >Reporter: Kostas Kloudas >Priority: Critical > Labels: test-stability > > Travis log: https://travis-ci.org/kl0u/flink/jobs/301985988 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8452) BucketingSinkTest.testNonRollingAvroKeyValueWithCompressionWriter instable on Travis
[ https://issues.apache.org/jira/browse/FLINK-8452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-8452. -- Resolution: Cannot Reproduce Fix Version/s: (was: 1.5.6) (was: 1.6.3) (was: 1.7.0) Problem did not reappear for quite some time. > BucketingSinkTest.testNonRollingAvroKeyValueWithCompressionWriter instable on > Travis > > > Key: FLINK-8452 > URL: https://issues.apache.org/jira/browse/FLINK-8452 > Project: Flink > Issue Type: Bug > Components: DataStream API, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > The {{BucketingSinkTest.testNonRollingAvroKeyValueWithCompressionWriter}} > seems to be instable on Travis: > > https://travis-ci.org/tillrohrmann/flink/jobs/330261310 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8408) YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3n unstable on Travis
[ https://issues.apache.org/jira/browse/FLINK-8408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-8408. -- Resolution: Cannot Reproduce Fix Version/s: (was: 1.5.6) (was: 1.6.3) (was: 1.7.0) Closing because the problem did not occur for quite some time. > YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3n unstable on Travis > -- > > Key: FLINK-8408 > URL: https://issues.apache.org/jira/browse/FLINK-8408 > Project: Flink > Issue Type: Bug > Components: FileSystem, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > The {{YarnFileStageTestS3ITCase.testRecursiveUploadForYarnS3n}} is unstable > on Travis. > https://travis-ci.org/tillrohrmann/flink/jobs/327216460 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8779) ClassLoaderITCase.testKMeansJobWithCustomClassLoader fails on Travis
[ https://issues.apache.org/jira/browse/FLINK-8779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-8779. -- Resolution: Cannot Reproduce Fix Version/s: (was: 1.5.6) (was: 1.6.3) The test case has been ported to the Flip-6 code base. If the problem should reoccur, then we should reopen this issue. > ClassLoaderITCase.testKMeansJobWithCustomClassLoader fails on Travis > > > Key: FLINK-8779 > URL: https://issues.apache.org/jira/browse/FLINK-8779 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > The \{{ClassLoaderITCase.testKMeansJobWithCustomClassLoader}} fails on Travis > by producing not output for 300s. This might indicate a test instability or a > problem with Flink which was recently introduced. > https://api.travis-ci.org/v3/job/344427688/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8034) ProcessFailureCancelingITCase.testCancelingOnProcessFailure failing on Travis
[ https://issues.apache.org/jira/browse/FLINK-8034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-8034. -- Resolution: Not A Problem Fix Version/s: (was: 1.5.6) (was: 1.6.3) (was: 1.7.0) The test case has been ported to the flip-6 code base. If the problem should reoccur, we can reopen this issue. > ProcessFailureCancelingITCase.testCancelingOnProcessFailure failing on Travis > - > > Key: FLINK-8034 > URL: https://issues.apache.org/jira/browse/FLINK-8034 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > The {{ProcessFailureCancelingITCase.testCancelingOnProcessFailure}} is > failing on Travis spuriously. > https://travis-ci.org/tillrohrmann/flink/jobs/299075703 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8779) ClassLoaderITCase.testKMeansJobWithCustomClassLoader fails on Travis
[ https://issues.apache.org/jira/browse/FLINK-8779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-8779: - Fix Version/s: (was: 1.7.0) > ClassLoaderITCase.testKMeansJobWithCustomClassLoader fails on Travis > > > Key: FLINK-8779 > URL: https://issues.apache.org/jira/browse/FLINK-8779 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.5.6, 1.6.3 > > > The \{{ClassLoaderITCase.testKMeansJobWithCustomClassLoader}} fails on Travis > by producing not output for 300s. This might indicate a test instability or a > problem with Flink which was recently introduced. > https://api.travis-ci.org/v3/job/344427688/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7351) test instability in JobClientActorRecoveryITCase#testJobClientRecovery
[ https://issues.apache.org/jira/browse/FLINK-7351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657237#comment-16657237 ] Till Rohrmann commented on FLINK-7351: -- No longer relevant for 1.7 since we disabled the legacy mode in this version. > test instability in JobClientActorRecoveryITCase#testJobClientRecovery > -- > > Key: FLINK-7351 > URL: https://issues.apache.org/jira/browse/FLINK-7351 > Project: Flink > Issue Type: Bug > Components: Job-Submission, Tests >Affects Versions: 1.3.2, 1.4.0 >Reporter: Nico Kruber >Priority: Critical > Labels: test-stability > Fix For: 1.5.6, 1.6.3 > > > On a 16-core VM, the following test failed during {{mvn clean verify}} > {code} > Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 22.814 sec > <<< FAILURE! - in org.apache.flink.runtime.client.JobClientActorRecoveryITCase > testJobClientRecovery(org.apache.flink.runtime.client.JobClientActorRecoveryITCase) > Time elapsed: 21.299 sec <<< ERROR! > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration. Resources available to scheduler: Number of instances=0, total > number of slots=0, available slots=0 > at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:334) > at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:139) > at > org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:368) > at > org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:309) > at > org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:450) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleLazy(ExecutionGraph.java:834) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:814) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1425) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- Th
[jira] [Resolved] (FLINK-9764) Failure in LocalRecoveryRocksDBFullITCase
[ https://issues.apache.org/jira/browse/FLINK-9764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9764. -- Resolution: Cannot Reproduce So far this issue has never been reported again and could not be reproduced. Thus, I would close it for the moment. In case that it reoccurs, please reopen this issue. > Failure in LocalRecoveryRocksDBFullITCase > - > > Key: FLINK-9764 > URL: https://issues.apache.org/jira/browse/FLINK-9764 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Streaming, Tests >Affects Versions: 1.6.0 >Reporter: Nico Kruber >Priority: Critical > Labels: test-stability > Fix For: 1.6.3, 1.7.0 > > > {code} > Running org.apache.flink.test.checkpointing.LocalRecoveryRocksDBFullITCase > Starting null#executeTest. > org.apache.flink.runtime.client.JobExecutionException: > java.lang.AssertionError: Window start: 0 end: 100 expected:<4950> but > was:<1209> > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623) > at > org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79) > at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:35) > at > org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.testTumblingTimeWindow(AbstractEventTimeWindowCheckpointingITCase.java:286) > at > org.apache.flink.test.checkpointing.AbstractLocalRecoveryITCase.executeTest(AbstractLocalRecoveryITCase.java:82) > at > org.apache.flink.test.checkpointing.AbstractLocalRecoveryITCase.executeTest(AbstractLocalRecoveryITCase.java:74) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) > Caused by: java.lang.AssertionError: Window start: 0 end: 100 expected:<4950> > but was:<1209> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at > org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase$ValidatingSink.invoke(AbstractEventTimeWindowCheckpointingITCase.java:733) > at > org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase$ValidatingSink.invoke(AbstractEventTimeWindowCheckpointingITCase.java:669) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:5
[jira] [Resolved] (FLINK-10614) Update test_batch_allround.sh e2e to new testing infrastructure
[ https://issues.apache.org/jira/browse/FLINK-10614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-10614. --- Resolution: Fixed Fixed via 1.7.0: https://github.com/apache/flink/commit/0fd7bd4f560f61077f72638a69a9102e33841365 1.6.3: https://github.com/apache/flink/commit/c0c9f39504a207d60d43cd7eea206acffe2cf4da 1.5.6: https://github.com/apache/flink/commit/b0b1548d5476d8a1e3ccf89a180773e725b8e5f6 > Update test_batch_allround.sh e2e to new testing infrastructure > --- > > Key: FLINK-10614 > URL: https://issues.apache.org/jira/browse/FLINK-10614 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.4, 1.6.1, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > The {{test_batch_allround.sh}} still does clean up tasks which is done by the > {{test-runner-common.sh}}. This is no longer needed and should be the > responsibility of the latter. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann closed pull request #6886: [BP-1.5][FLINK-10614] Let test_batch_allround.sh rely on test-runner-common.sh for clean up
tillrohrmann closed pull request #6886: [BP-1.5][FLINK-10614] Let test_batch_allround.sh rely on test-runner-common.sh for clean up URL: https://github.com/apache/flink/pull/6886 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 0f4d5bade30..c4a08586be9 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -400,6 +400,15 @@ function tm_watchdog { done } +function start_taskmanagers { +tmnum=$1 +echo "Start ${tmnum} more task managers" +for (( c=0; c> $FLINK_DIR/conf/flink-conf.yaml echo "taskmanager.network.memory.max: 10485760" >> $FLINK_DIR/conf/flink-conf.yaml -backup_config set_conf_ssl start_cluster -$FLINK_DIR/bin/taskmanager.sh start -$FLINK_DIR/bin/taskmanager.sh start -$FLINK_DIR/bin/taskmanager.sh start - -function test_cleanup { - # don't call ourselves again for another signal interruption - trap "exit -1" INT - # don't call ourselves again for normal exit - trap "" EXIT - - stop_cluster - $FLINK_DIR/bin/taskmanager.sh stop-all - - # revert our modifications to the Flink distribution - mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml -} -trap test_cleanup INT -trap test_cleanup EXIT +start_taskmanagers 3 $FLINK_DIR/bin/flink run -p 4 $TEST_PROGRAM_JAR --loadFactor 4 --outputPath $TEST_DATA_DIR/out/dataset_allround This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann closed pull request #6885: [BP-1.6][FLINK-10614] Let test_batch_allround.sh rely on test-runner-common.sh for clean up
tillrohrmann closed pull request #6885: [BP-1.6][FLINK-10614] Let test_batch_allround.sh rely on test-runner-common.sh for clean up URL: https://github.com/apache/flink/pull/6885 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh index 2bb274b9557..f8f591f7dc0 100755 --- a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh +++ b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh @@ -24,30 +24,12 @@ TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAll echo "Run DataSet-Allround-Test Program" # modify configuration to include spilling to disk -cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak echo "taskmanager.network.memory.min: 10485760" >> $FLINK_DIR/conf/flink-conf.yaml echo "taskmanager.network.memory.max: 10485760" >> $FLINK_DIR/conf/flink-conf.yaml set_conf_ssl start_cluster -$FLINK_DIR/bin/taskmanager.sh start -$FLINK_DIR/bin/taskmanager.sh start -$FLINK_DIR/bin/taskmanager.sh start - -function test_cleanup { - # don't call ourselves again for another signal interruption - trap "exit -1" INT - # don't call ourselves again for normal exit - trap "" EXIT - - stop_cluster - $FLINK_DIR/bin/taskmanager.sh stop-all - - # revert our modifications to the Flink distribution - mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml -} -trap test_cleanup INT -trap test_cleanup EXIT +start_taskmanagers 3 $FLINK_DIR/bin/flink run -p 4 $TEST_PROGRAM_JAR --loadFactor 4 --outputPath $TEST_DATA_DIR/out/dataset_allround This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10614) Update test_batch_allround.sh e2e to new testing infrastructure
[ https://issues.apache.org/jira/browse/FLINK-10614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657215#comment-16657215 ] ASF GitHub Bot commented on FLINK-10614: tillrohrmann closed pull request #6884: [FLINK-10614] Let test_batch_allround.sh rely on test-runner-common.sh for clean up URL: https://github.com/apache/flink/pull/6884 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh index 2bb274b9557..f8f591f7dc0 100755 --- a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh +++ b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh @@ -24,30 +24,12 @@ TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAll echo "Run DataSet-Allround-Test Program" # modify configuration to include spilling to disk -cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak echo "taskmanager.network.memory.min: 10485760" >> $FLINK_DIR/conf/flink-conf.yaml echo "taskmanager.network.memory.max: 10485760" >> $FLINK_DIR/conf/flink-conf.yaml set_conf_ssl start_cluster -$FLINK_DIR/bin/taskmanager.sh start -$FLINK_DIR/bin/taskmanager.sh start -$FLINK_DIR/bin/taskmanager.sh start - -function test_cleanup { - # don't call ourselves again for another signal interruption - trap "exit -1" INT - # don't call ourselves again for normal exit - trap "" EXIT - - stop_cluster - $FLINK_DIR/bin/taskmanager.sh stop-all - - # revert our modifications to the Flink distribution - mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml -} -trap test_cleanup INT -trap test_cleanup EXIT +start_taskmanagers 3 $FLINK_DIR/bin/flink run -p 4 $TEST_PROGRAM_JAR --loadFactor 4 --outputPath $TEST_DATA_DIR/out/dataset_allround This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update test_batch_allround.sh e2e to new testing infrastructure > --- > > Key: FLINK-10614 > URL: https://issues.apache.org/jira/browse/FLINK-10614 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.4, 1.6.1, 1.7.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > The {{test_batch_allround.sh}} still does clean up tasks which is done by the > {{test-runner-common.sh}}. This is no longer needed and should be the > responsibility of the latter. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann closed pull request #6884: [FLINK-10614] Let test_batch_allround.sh rely on test-runner-common.sh for clean up
tillrohrmann closed pull request #6884: [FLINK-10614] Let test_batch_allround.sh rely on test-runner-common.sh for clean up URL: https://github.com/apache/flink/pull/6884 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh index 2bb274b9557..f8f591f7dc0 100755 --- a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh +++ b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh @@ -24,30 +24,12 @@ TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAll echo "Run DataSet-Allround-Test Program" # modify configuration to include spilling to disk -cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak echo "taskmanager.network.memory.min: 10485760" >> $FLINK_DIR/conf/flink-conf.yaml echo "taskmanager.network.memory.max: 10485760" >> $FLINK_DIR/conf/flink-conf.yaml set_conf_ssl start_cluster -$FLINK_DIR/bin/taskmanager.sh start -$FLINK_DIR/bin/taskmanager.sh start -$FLINK_DIR/bin/taskmanager.sh start - -function test_cleanup { - # don't call ourselves again for another signal interruption - trap "exit -1" INT - # don't call ourselves again for normal exit - trap "" EXIT - - stop_cluster - $FLINK_DIR/bin/taskmanager.sh stop-all - - # revert our modifications to the Flink distribution - mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml -} -trap test_cleanup INT -trap test_cleanup EXIT +start_taskmanagers 3 $FLINK_DIR/bin/flink run -p 4 $TEST_PROGRAM_JAR --loadFactor 4 --outputPath $TEST_DATA_DIR/out/dataset_allround This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10558) Port YARNHighAvailabilityITCase and YARNSessionCapacitySchedulerITCase to new code base
[ https://issues.apache.org/jira/browse/FLINK-10558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TisonKun reassigned FLINK-10558: Assignee: TisonKun > Port YARNHighAvailabilityITCase and YARNSessionCapacitySchedulerITCase to new > code base > --- > > Key: FLINK-10558 > URL: https://issues.apache.org/jira/browse/FLINK-10558 > Project: Flink > Issue Type: Sub-task >Reporter: vinoyang >Assignee: TisonKun >Priority: Minor > > {{YARNHighAvailabilityITCase}}, > {{YARNSessionCapacitySchedulerITCase#testClientStartup,}} > {{YARNSessionCapacitySchedulerITCase#testTaskManagerFailure}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-7351) test instability in JobClientActorRecoveryITCase#testJobClientRecovery
[ https://issues.apache.org/jira/browse/FLINK-7351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-7351: - Fix Version/s: (was: 1.7.0) > test instability in JobClientActorRecoveryITCase#testJobClientRecovery > -- > > Key: FLINK-7351 > URL: https://issues.apache.org/jira/browse/FLINK-7351 > Project: Flink > Issue Type: Bug > Components: Job-Submission, Tests >Affects Versions: 1.3.2, 1.4.0 >Reporter: Nico Kruber >Priority: Critical > Labels: test-stability > Fix For: 1.5.6, 1.6.3 > > > On a 16-core VM, the following test failed during {{mvn clean verify}} > {code} > Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 22.814 sec > <<< FAILURE! - in org.apache.flink.runtime.client.JobClientActorRecoveryITCase > testJobClientRecovery(org.apache.flink.runtime.client.JobClientActorRecoveryITCase) > Time elapsed: 21.299 sec <<< ERROR! > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the operator > parallelism or increase the number of slots per TaskManager in the > configuration. Resources available to scheduler: Number of instances=0, total > number of slots=0, available slots=0 > at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:334) > at > org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:139) > at > org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:368) > at > org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:309) > at > org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:450) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleLazy(ExecutionGraph.java:834) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:814) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1425) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9000) Add automated cluster framework tests
[ https://issues.apache.org/jira/browse/FLINK-9000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-9000: - Fix Version/s: (was: 1.7.0) > Add automated cluster framework tests > - > > Key: FLINK-9000 > URL: https://issues.apache.org/jira/browse/FLINK-9000 > Project: Flink > Issue Type: Task > Components: Tests >Reporter: Till Rohrmann >Priority: Critical > Fix For: 1.8.0 > > > Similar to the end-to-end tests which are added to the > {{flink-end-to-end-tests}} module, we should add cluster framework tests > which we can run automatically. > One idea how to launch a cluster framework would be to use Terraform to > allocate nodes from AWS or GCE. Next we could setup a different cluster > framework like Yarn or Mesos on which we run Flink. In order to simulate > different failure scenarios a framework like Jepsen could be used. > This issue serves as an umbrella issue to collect all issues which are > related to cluster framework tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9000) Add automated cluster framework tests
[ https://issues.apache.org/jira/browse/FLINK-9000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-9000: - Fix Version/s: 1.8.0 > Add automated cluster framework tests > - > > Key: FLINK-9000 > URL: https://issues.apache.org/jira/browse/FLINK-9000 > Project: Flink > Issue Type: Task > Components: Tests >Reporter: Till Rohrmann >Priority: Critical > Fix For: 1.8.0 > > > Similar to the end-to-end tests which are added to the > {{flink-end-to-end-tests}} module, we should add cluster framework tests > which we can run automatically. > One idea how to launch a cluster framework would be to use Terraform to > allocate nodes from AWS or GCE. Next we could setup a different cluster > framework like Yarn or Mesos on which we run Flink. In order to simulate > different failure scenarios a framework like Jepsen could be used. > This issue serves as an umbrella issue to collect all issues which are > related to cluster framework tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10527) Cleanup constant isNewMode in YarnTestBase
[ https://issues.apache.org/jira/browse/FLINK-10527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657026#comment-16657026 ] ASF GitHub Bot commented on FLINK-10527: yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#issuecomment-431419776 @tillrohrmann I have updated this PR, just keep the change for class `YARNSessionFIFOITCase `. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cleanup constant isNewMode in YarnTestBase > -- > > Key: FLINK-10527 > URL: https://issues.apache.org/jira/browse/FLINK-10527 > Project: Flink > Issue Type: Sub-task > Components: YARN >Reporter: vinoyang >Assignee: vinoyang >Priority: Major > Labels: pull-request-available > > This seems to be a residual problem with FLINK-10396. It is set to true in > that PR. Currently it has three usage scenarios: > 1. assert, caused an error > {code:java} > assumeTrue("The new mode does not start TMs upfront.", !isNewMode); > {code} > 2. if (!isNewMode) the logic in the block would not have invoked, the if > block can be removed > 3. if (isNewMode) always been invoked, the if statement can be removed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase
yanghua commented on issue #6816: [FLINK-10527] Cleanup constant isNewMode in YarnTestBase URL: https://github.com/apache/flink/pull/6816#issuecomment-431419776 @tillrohrmann I have updated this PR, just keep the change for class `YARNSessionFIFOITCase `. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10357) Streaming File Sink end-to-end test failed with mismatch
[ https://issues.apache.org/jira/browse/FLINK-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao reassigned FLINK-10357: Assignee: Gary Yao > Streaming File Sink end-to-end test failed with mismatch > > > Key: FLINK-10357 > URL: https://issues.apache.org/jira/browse/FLINK-10357 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Critical > Labels: test-stability > Fix For: 1.6.3, 1.7.0 > > Attachments: flink-logs.tgz, flink-streaming-file-sink-logs.tgz > > > The {{Streaming File Sink end-to-end test}} failed on an Amazon instance with > the following result: > {code} > FAIL File Streaming Sink: Output hash mismatch. Got > f2000bbc18a889dc8ec4b6f2b47bf9f5, expected 6727342fdd3aae2129e61fc8f433fb6f. > head hexdump of actual: > 000 0 \n 1 \n 2 \n 3 \n 4 \n 5 \n 6 \n 7 \n > 010 8 \n 9 \n > 014 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10616) Jepsen test fails while tearing down Hadoop
[ https://issues.apache.org/jira/browse/FLINK-10616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-10616: - Description: While tearing down Hadoop, the tests sporadically fail with the exception below: {noformat} Caused by: java.lang.RuntimeException: sudo -S -u root bash -c "cd /; ps aux | grep hadoop | grep -v grep | awk \"\{print \\\$2}\" | xargs kill -9" returned non-zero exit status 123 on 172.31.39.235. STDOUT: STDERR: at jepsen.control$throw_on_nonzero_exit.invokeStatic(control.clj:129) ~[jepsen-0.1.10.jar:na] at jepsen.control$throw_on_nonzero_exit.invoke(control.clj:122) ~[jepsen-0.1.10.jar:na] at jepsen.control$exec_STAR_.invokeStatic(control.clj:166) ~[jepsen-0.1.10.jar:na] at jepsen.control$exec_STAR_.doInvoke(control.clj:163) ~[jepsen-0.1.10.jar:na] at clojure.lang.RestFn.applyTo(RestFn.java:137) [clojure-1.9.0.jar:na] at clojure.core$apply.invokeStatic(core.clj:657) ~[clojure-1.9.0.jar:na] at clojure.core$apply.invoke(core.clj:652) ~[clojure-1.9.0.jar:na] at jepsen.control$exec.invokeStatic(control.clj:182) ~[jepsen-0.1.10.jar:na] at jepsen.control$exec.doInvoke(control.clj:176) ~[jepsen-0.1.10.jar:na] at clojure.lang.RestFn.invoke(RestFn.java:2088) [clojure-1.9.0.jar:na] at jepsen.control.util$grepkill_BANG_.invokeStatic(util.clj:197) ~[classes/:na] at jepsen.control.util$grepkill_BANG_.invoke(util.clj:191) ~[classes/:na] at jepsen.control.util$grepkill_BANG_.invokeStatic(util.clj:194) ~[classes/:na] at jepsen.control.util$grepkill_BANG_.invoke(util.clj:191) ~[classes/:na] at jepsen.flink.hadoop$db$reify__3102.teardown_BANG_(hadoop.clj:128) ~[classes/:na] at jepsen.flink.db$combined_db$reify__217$fn__220.invoke(db.clj:119) ~[na:na] at clojure.core$map$fn__5587.invoke(core.clj:2745) ~[clojure-1.9.0.jar:na] at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.9.0.jar:na] at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.9.0.jar:na] at clojure.lang.RT.seq(RT.java:528) ~[clojure-1.9.0.jar:na] at clojure.core$seq__5124.invokeStatic(core.clj:137) ~[clojure-1.9.0.jar:na] at clojure.core$dorun.invokeStatic(core.clj:3125) ~[clojure-1.9.0.jar:na] at clojure.core$doall.invokeStatic(core.clj:3140) ~[clojure-1.9.0.jar:na] at clojure.core$doall.invoke(core.clj:3140) ~[clojure-1.9.0.jar:na] at jepsen.flink.db$combined_db$reify__217.teardown_BANG_(db.clj:119) ~[na:na] at jepsen.db$fn__2137$G__2133__2141.invoke(db.clj:8) ~[jepsen-0.1.10.jar:na] at jepsen.db$fn__2137$G__2132__2146.invoke(db.clj:8) ~[jepsen-0.1.10.jar:na] at clojure.core$partial$fn__5561.invoke(core.clj:2617) ~[clojure-1.9.0.jar:na] at jepsen.control$on_nodes$fn__2116.invoke(control.clj:372) ~[jepsen-0.1.10.jar:na] at clojure.lang.AFn.applyToHelper(AFn.java:154) ~[clojure-1.9.0.jar:na] at clojure.lang.AFn.applyTo(AFn.java:144) ~[clojure-1.9.0.jar:na] at clojure.core$apply.invokeStatic(core.clj:657) ~[clojure-1.9.0.jar:na] at clojure.core$with_bindings_STAR_.invokeStatic(core.clj:1965) ~[clojure-1.9.0.jar:na] at clojure.core$with_bindings_STAR_.doInvoke(core.clj:1965) ~[clojure-1.9.0.jar:na] at clojure.lang.RestFn.applyTo(RestFn.java:142) [clojure-1.9.0.jar:na] at clojure.core$apply.invokeStatic(core.clj:661) ~[clojure-1.9.0.jar:na] at clojure.core$bound_fn_STAR_$fn__5471.doInvoke(core.clj:1995) ~[clojure-1.9.0.jar:na] at clojure.lang.RestFn.invoke(RestFn.java:408) [clojure-1.9.0.jar:na] at jepsen.util$real_pmap$launcher__1168$fn__1169.invoke(util.clj:49) ~[jepsen-0.1.10.jar:na] at clojure.core$binding_conveyor_fn$fn__5476.invoke(core.clj:2022) ~[clojure-1.9.0.jar:na] at clojure.lang.AFn.call(AFn.java:18) ~[clojure-1.9.0.jar:na] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_171] at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_171] {noformat} was: While tearing down Hadoop, the test fails with the exception below: {noformat} Caused by: java.lang.RuntimeException: sudo -S -u root bash -c "cd /; ps aux | grep hadoop | grep -v grep | awk \"\{print \\\$2}\" | xargs kill -9" returned non-zero exit status 123 on 172.31.39.235. STDOUT: STDERR: at jepsen.control$throw_on_nonzero_exit.invokeStatic(control.clj:129) ~[jepsen-0.1.10.jar:na] at jepsen.control$throw_on_nonzero_exit.invoke(control.clj:122) ~[jepsen-0.1.10.jar:na] at jepsen.control$exec_STAR_.invokeStatic(control.clj:166) ~[jepsen-0.1.10.jar:na]
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656964#comment-16656964 ] ASF GitHub Bot commented on FLINK-9808: --- StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226691526 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( Review comment: Sometimes it can be a good idea to separate multiple precondition checks into a separate method so that it is easier to focus on the main logic of the function. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement state conversion procedure in state backends > -- > > Key: FLINK-9808 > URL: https://issues.apache.org/jira/browse/FLINK-9808 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-9377 in place and that config snapshots serve as the single source > of truth for recreating restore serializers, the next step would be to > utilize this when performing a full-pass state conversion (i.e., read with > old / restore serializer, write with new serializer). > For Flink's heap-based backends, it can be seen that state conversion > inherently happens, since all state is always deserialized after restore with > the restore serializer, and written with the new serializer on snapshots. > For the RocksDB state backend, since state is lazily deserialized, state > conversion needs to happen for per-registered state on their first access if > the registered new serializer has a different serialization schema than the > previous serializer. > This task should consist of three parts: > 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the > new serializer's schema is a) compatible with the serializer as it is, b) > compatible after the serializer has been reconfigured, or c) incompatible. > 2. Introduce state conversion procedures in the RocksDB state backend. This > should occur on the first state access. > 3. Make sure that all other backends no longer do redundant serializer > compatibility checks. That is not required because those backends always > perform full-pass state conversions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656953#comment-16656953 ] ASF GitHub Bot commented on FLINK-9808: --- StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226669120 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + stateDesc.getType() == restoredType, + "Incompatible key/value state types. " + + "Was [" + restoredType + "], " + + "registered with [" + stateDesc.getType() + "]."); + } + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( Review comment: It seems that now both branches in the calling method `tryRegisterKvStateInformation` create a `newMetaInfo`. Why not always create it there and pass it as only argument to this function which can then transform it if needed? Could deduplicate and simplify some code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement state conversion procedure in state backends > -- > > Key: FLINK-9808 > URL: https://issues.apache.org/jira/browse/FLINK-9808 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-9377 in place and that config snapshots serve as the single source > of truth for recreating restore serializers, the next step would be to > utilize this when performing a full-pass state conversion (i.e., read with > old / restore serializer, write with new serializer). > For Flink's heap-based backends, it can be seen that state conversion > inherently happens, since all state is always deserialized after restore with > the restore serializer,
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656965#comment-16656965 ] ASF GitHub Bot commented on FLINK-9808: --- StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226689891 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + stateDesc.getType() == restoredType, + "Incompatible key/value state types. " + + "Was [" + restoredType + "], " + + "registered with [" + stateDesc.getType() + "]."); + } + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer, + snapshotTransformer); + + CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + namespaceSerializer); + + CompatibilityResult stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + stateSerializer); + + if (namespaceCompatibility.isRequiresMigration()) { + throw new UnsupportedOperationException("The new namespace serializer requires state migration in order for the job to proceed." + + " However, migration for state namespace currently isn't supported."); + } + + if (stateCompatibility.isRequiresMigration()) { +
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656963#comment-16656963 ] ASF GitHub Bot commented on FLINK-9808: --- StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226676987 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ## @@ -269,10 +269,11 @@ public HeapKeyedStateBackend( "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + " but its corresponding restored snapshot cannot be found."); - newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility( - restoredMetaInfoSnapshot, + newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( Review comment: We can now move this outside the if and initialize `newMetaInfo` directly for all cases to reduce code duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement state conversion procedure in state backends > -- > > Key: FLINK-9808 > URL: https://issues.apache.org/jira/browse/FLINK-9808 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-9377 in place and that config snapshots serve as the single source > of truth for recreating restore serializers, the next step would be to > utilize this when performing a full-pass state conversion (i.e., read with > old / restore serializer, write with new serializer). > For Flink's heap-based backends, it can be seen that state conversion > inherently happens, since all state is always deserialized after restore with > the restore serializer, and written with the new serializer on snapshots. > For the RocksDB state backend, since state is lazily deserialized, state > conversion needs to happen for per-registered state on their first access if > the registered new serializer has a different serialization schema than the > previous serializer. > This task should consist of three parts: > 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the > new serializer's schema is a) compatible with the serializer as it is, b) > compatible after the serializer has been reconfigured, or c) incompatible. > 2. Introduce state conversion procedures in the RocksDB state backend. This > should occur on the first state access. > 3. Make sure that all other backends no longer do redundant serializer > compatibility checks. That is not required because those backends always > perform full-pass state conversions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656962#comment-16656962 ] ASF GitHub Bot commented on FLINK-9808: --- StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226687515 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + stateDesc.getType() == restoredType, + "Incompatible key/value state types. " + + "Was [" + restoredType + "], " + + "registered with [" + stateDesc.getType() + "]."); + } + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer, + snapshotTransformer); + + CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + namespaceSerializer); + + CompatibilityResult stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + stateSerializer); + + if (namespaceCompatibility.isRequiresMigration()) { + throw new UnsupportedOperationException("The new namespace serializer requires state migration in order for the job to proceed." + + " However, migration for state namespace currently isn't supported."); + } + + if (stateCompatibility.isRequiresMigration()) { +
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656957#comment-16656957 ] ASF GitHub Bot commented on FLINK-9808: --- StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226688632 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + stateDesc.getType() == restoredType, + "Incompatible key/value state types. " + + "Was [" + restoredType + "], " + + "registered with [" + stateDesc.getType() + "]."); + } + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer, + snapshotTransformer); + + CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + namespaceSerializer); + + CompatibilityResult stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + stateSerializer); + + if (namespaceCompatibility.isRequiresMigration()) { + throw new UnsupportedOperationException("The new namespace serializer requires state migration in order for the job to proceed." + + " However, migration for state namespace currently isn't supported."); + } + + if (stateCompatibility.isRequiresMigration()) { +
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656952#comment-16656952 ] ASF GitHub Bot commented on FLINK-9808: --- StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226685552 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + stateDesc.getType() == restoredType, + "Incompatible key/value state types. " + + "Was [" + restoredType + "], " + + "registered with [" + stateDesc.getType() + "]."); + } + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer, + snapshotTransformer); + + CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + namespaceSerializer); + + CompatibilityResult stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + stateSerializer); + + if (namespaceCompatibility.isRequiresMigration()) { + throw new UnsupportedOperationException("The new namespace serializer requires state migration in order for the job to proceed." + + " However, migration for state namespace currently isn't supported."); + } + + if (stateCompatibility.isRequiresMigration()) { +
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656958#comment-16656958 ] ASF GitHub Bot commented on FLINK-9808: --- StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226684033 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ## @@ -154,6 +159,29 @@ public void setCurrentNamespace(N namespace) { return backend.db.get(columnFamily, tmpKeySerializationView.getCopyOfBuffer()); } + public byte[] migrateSerializedValue( + byte[] serializedOldValue, + TypeSerializer priorSerializer, + TypeSerializer newSerializer) throws StateMigrationException { + + try { + V value = priorSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(serializedOldValue))); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); Review comment: This method is called in a loop. You should avoid creating streams and other temporary objects on each invocation. Instead, let the calling loop pass them in and reuse the same instances for the whole process. If we had a `StateValueTransformer` instance, they could also hold them as members. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement state conversion procedure in state backends > -- > > Key: FLINK-9808 > URL: https://issues.apache.org/jira/browse/FLINK-9808 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-9377 in place and that config snapshots serve as the single source > of truth for recreating restore serializers, the next step would be to > utilize this when performing a full-pass state conversion (i.e., read with > old / restore serializer, write with new serializer). > For Flink's heap-based backends, it can be seen that state conversion > inherently happens, since all state is always deserialized after restore with > the restore serializer, and written with the new serializer on snapshots. > For the RocksDB state backend, since state is lazily deserialized, state > conversion needs to happen for per-registered state on their first access if > the registered new serializer has a different serialization schema than the > previous serializer. > This task should consist of three parts: > 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the > new serializer's schema is a) compatible with the serializer as it is, b) > compatible after the serializer has been reconfigured, or c) incompatible. > 2. Introduce state conversion procedures in the RocksDB state backend. This > should occur on the first state access. > 3. Make sure that all other backends no longer do redundant serializer > compatibility checks. That is not required because those backends always > perform full-pass state conversions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656954#comment-16656954 ] ASF GitHub Bot commented on FLINK-9808: --- StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226662102 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + stateDesc.getType() == restoredType, + "Incompatible key/value state types. " + + "Was [" + restoredType + "], " + + "registered with [" + stateDesc.getType() + "]."); + } + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer, + snapshotTransformer); + + CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( Review comment: This new code is calling methods from a deprecated class. I suggest to either remove the deprecation or to use the newer replacement. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement state conversion procedure in state backends > -- > > Key: FLINK-9808 > URL: https://issues.apache.org/jira/browse/FLINK-9808 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-9377 in place and that config snapshots serve as the single source > of truth for recreating restore serializers, the next step would be to > utilize this when performing a full-pass state conversion (i.e., read with > old / restore serializer, write with n
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656959#comment-16656959 ] ASF GitHub Bot commented on FLINK-9808: --- StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226686907 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + stateDesc.getType() == restoredType, + "Incompatible key/value state types. " + + "Was [" + restoredType + "], " + + "registered with [" + stateDesc.getType() + "]."); + } + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer, + snapshotTransformer); + + CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + namespaceSerializer); + + CompatibilityResult stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + stateSerializer); + + if (namespaceCompatibility.isRequiresMigration()) { + throw new UnsupportedOperationException("The new namespace serializer requires state migration in order for the job to proceed." + + " However, migration for state namespace currently isn't supported."); + } + + if (stateCompatibility.isRequiresMigration()) { +
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656955#comment-16656955 ] ASF GitHub Bot commented on FLINK-9808: --- StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226680634 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + stateDesc.getType() == restoredType, + "Incompatible key/value state types. " + + "Was [" + restoredType + "], " + + "registered with [" + stateDesc.getType() + "]."); + } + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer, + snapshotTransformer); + + CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + namespaceSerializer); + + CompatibilityResult stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + stateSerializer); + + if (namespaceCompatibility.isRequiresMigration()) { + throw new UnsupportedOperationException("The new namespace serializer requires state migration in order for the job to proceed." + + " However, migration for state namespace currently isn't supported."); + } + + if (stateCompatibility.isRequiresMigration()) { +
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656951#comment-16656951 ] ASF GitHub Bot commented on FLINK-9808: --- StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226683613 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ## @@ -154,6 +159,29 @@ public void setCurrentNamespace(N namespace) { return backend.db.get(columnFamily, tmpKeySerializationView.getCopyOfBuffer()); } + public byte[] migrateSerializedValue( + byte[] serializedOldValue, + TypeSerializer priorSerializer, + TypeSerializer newSerializer) throws StateMigrationException { + + try { + V value = priorSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(serializedOldValue))); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); Review comment: Better use `DataInputDeserializer` / `DataOutputSerializer` instead of wrapped byte-array streams. They are quiet a bit faster. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement state conversion procedure in state backends > -- > > Key: FLINK-9808 > URL: https://issues.apache.org/jira/browse/FLINK-9808 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-9377 in place and that config snapshots serve as the single source > of truth for recreating restore serializers, the next step would be to > utilize this when performing a full-pass state conversion (i.e., read with > old / restore serializer, write with new serializer). > For Flink's heap-based backends, it can be seen that state conversion > inherently happens, since all state is always deserialized after restore with > the restore serializer, and written with the new serializer on snapshots. > For the RocksDB state backend, since state is lazily deserialized, state > conversion needs to happen for per-registered state on their first access if > the registered new serializer has a different serialization schema than the > previous serializer. > This task should consist of three parts: > 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the > new serializer's schema is a) compatible with the serializer as it is, b) > compatible after the serializer has been reconfigured, or c) incompatible. > 2. Introduce state conversion procedures in the RocksDB state backend. This > should occur on the first state access. > 3. Make sure that all other backends no longer do redundant serializer > compatibility checks. That is not required because those backends always > perform full-pass state conversions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656950#comment-16656950 ] ASF GitHub Bot commented on FLINK-9808: --- StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r22394 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) Review comment: I suggest to compare enums with `!=` instead of `!Objects.equals` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement state conversion procedure in state backends > -- > > Key: FLINK-9808 > URL: https://issues.apache.org/jira/browse/FLINK-9808 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-9377 in place and that config snapshots serve as the single source > of truth for recreating restore serializers, the next step would be to > utilize this when performing a full-pass state conversion (i.e., read with > old / restore serializer, write with new serializer). > For Flink's heap-based backends, it can be seen that state conversion > inherently happens, since all state is always deserialized after restore with > the restore serializer, and written with the new serializer on snapshots. > For the RocksDB state backend, since state is lazily deserialized, state > conversion needs to happen for per-registered state on their first access if > the registered new serializer has a different serialization schema than the > previous serializer. > This task should consist of three parts: > 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the > new serializer's schema is a) compatible with the serializer as it is, b) > compatible after the serializer has been reconfigured, or c) incompatible. > 2. Introduce state conversion procedures in the RocksDB state backend. This > should occur on the first state access. > 3. Make sure that all other backends no longer do redundant serializer > compatibility checks. That is not required because those backends always > perform fu
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656960#comment-16656960 ] ASF GitHub Bot commented on FLINK-9808: --- StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226690598 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( Review comment: I have an ongoing effort to shrink the size of the `RocksDBKeyedStateBackend` class. I think it is a good idea to move the migration concern in a separate class file if possible to keep different concerns more separated whereever possible. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement state conversion procedure in state backends > -- > > Key: FLINK-9808 > URL: https://issues.apache.org/jira/browse/FLINK-9808 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-9377 in place and that config snapshots serve as the single source > of truth for recreating restore serializers, the next step would be to > utilize this when performing a full-pass state conversion (i.e., read with > old / restore serializer, write with new serializer). > For Flink's heap-based backends, it can be seen that state conversion > inherently happens, since all state is always deserialized after restore with > the restore serializer, and written with the new serializer on snapshots. > For the RocksDB state backend, since state is lazily deserialized, state > conversion needs to happen for per-registered state on their first access if > the registered new serializer has a different serialization schema than the > previous serializer. > This task should consist of three parts: > 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the > new serializer's schema is a) compatible with the serializer as it is, b) > compatible after the serializer has been reconfigured, or c) incompatible. > 2. Introduce state conversion procedures in the RocksDB state backend. This > should occur on the first state access. > 3. Make sure that all other backends no longer do redundant serializer > compatibility checks. That is not required because those backends always > perform full-pass state conversions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656956#comment-16656956 ] ASF GitHub Bot commented on FLINK-9808: --- StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226663119 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); Review comment: This line and the following check are already done by the caller in exactly the same way. Wouldn't it make more sense to then pass `StateMetaInfoSnapshot` as argument from the caller? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement state conversion procedure in state backends > -- > > Key: FLINK-9808 > URL: https://issues.apache.org/jira/browse/FLINK-9808 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-9377 in place and that config snapshots serve as the single source > of truth for recreating restore serializers, the next step would be to > utilize this when performing a full-pass state conversion (i.e., read with > old / restore serializer, write with new serializer). > For Flink's heap-based backends, it can be seen that state conversion > inherently happens, since all state is always deserialized after restore with > the restore serializer, and written with the new serializer on snapshots. > For the RocksDB state backend, since state is lazily deserialized, state > conversion needs to happen for per-registered state on their first access if > the registered new serializer has a different serialization schema than the > previous serializer. > This task should consist of three parts: > 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the > new serializer's schema is a) compatible with the serializer as it is, b) > compatible after the serializer has been reconfigured, or c) incompatible. > 2. Introduce state conversion procedures in the RocksDB state backend. This > should occur on the first state access. > 3. Make sure that all other backends no longer do redundant serializer > compatibility checks. That is not required because those backends always > perform full-pass state conversions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9808) Implement state conversion procedure in state backends
[ https://issues.apache.org/jira/browse/FLINK-9808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656961#comment-16656961 ] ASF GitHub Bot commented on FLINK-9808: --- StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226679606 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ## @@ -154,6 +159,29 @@ public void setCurrentNamespace(N namespace) { return backend.db.get(columnFamily, tmpKeySerializationView.getCopyOfBuffer()); } + public byte[] migrateSerializedValue( Review comment: I think this method does not belong here. This class is used to interact with the state during event processing, I suggest to keep that separated from migration concerns. If you are looking for a way to differentiate the logic for different states, you could also just use a factor that gives you different implementations of some `StateValueTransformer`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Implement state conversion procedure in state backends > -- > > Key: FLINK-9808 > URL: https://issues.apache.org/jira/browse/FLINK-9808 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Aljoscha Krettek >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0 > > > With FLINK-9377 in place and that config snapshots serve as the single source > of truth for recreating restore serializers, the next step would be to > utilize this when performing a full-pass state conversion (i.e., read with > old / restore serializer, write with new serializer). > For Flink's heap-based backends, it can be seen that state conversion > inherently happens, since all state is always deserialized after restore with > the restore serializer, and written with the new serializer on snapshots. > For the RocksDB state backend, since state is lazily deserialized, state > conversion needs to happen for per-registered state on their first access if > the registered new serializer has a different serialization schema than the > previous serializer. > This task should consist of three parts: > 1. Allow {{CompatibilityResult}} to correctly distinguish between whether the > new serializer's schema is a) compatible with the serializer as it is, b) > compatible after the serializer has been reconfigured, or c) incompatible. > 2. Introduce state conversion procedures in the RocksDB state backend. This > should occur on the first state access. > 3. Make sure that all other backends no longer do redundant serializer > compatibility checks. That is not required because those backends always > perform full-pass state conversions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226684033 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ## @@ -154,6 +159,29 @@ public void setCurrentNamespace(N namespace) { return backend.db.get(columnFamily, tmpKeySerializationView.getCopyOfBuffer()); } + public byte[] migrateSerializedValue( + byte[] serializedOldValue, + TypeSerializer priorSerializer, + TypeSerializer newSerializer) throws StateMigrationException { + + try { + V value = priorSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(serializedOldValue))); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); Review comment: This method is called in a loop. You should avoid creating streams and other temporary objects on each invocation. Instead, let the calling loop pass them in and reuse the same instances for the whole process. If we had a `StateValueTransformer` instance, they could also hold them as members. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226687515 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + stateDesc.getType() == restoredType, + "Incompatible key/value state types. " + + "Was [" + restoredType + "], " + + "registered with [" + stateDesc.getType() + "]."); + } + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer, + snapshotTransformer); + + CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + namespaceSerializer); + + CompatibilityResult stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + stateSerializer); + + if (namespaceCompatibility.isRequiresMigration()) { + throw new UnsupportedOperationException("The new namespace serializer requires state migration in order for the job to proceed." + + " However, migration for state namespace currently isn't supported."); + } + + if (stateCompatibility.isRequiresMigration()) { + migrateStateValue(stateDesc, stateInfo, restoredMetaInfoSnapshot, newMetaInfo); + } else { + newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + newMetaInfo.getStat
[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226690598 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( Review comment: I have an ongoing effort to shrink the size of the `RocksDBKeyedStateBackend` class. I think it is a good idea to move the migration concern in a separate class file if possible to keep different concerns more separated whereever possible. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226662102 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + stateDesc.getType() == restoredType, + "Incompatible key/value state types. " + + "Was [" + restoredType + "], " + + "registered with [" + stateDesc.getType() + "]."); + } + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer, + snapshotTransformer); + + CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( Review comment: This new code is calling methods from a deprecated class. I suggest to either remove the deprecation or to use the newer replacement. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226686907 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + stateDesc.getType() == restoredType, + "Incompatible key/value state types. " + + "Was [" + restoredType + "], " + + "registered with [" + stateDesc.getType() + "]."); + } + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer, + snapshotTransformer); + + CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + namespaceSerializer); + + CompatibilityResult stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + stateSerializer); + + if (namespaceCompatibility.isRequiresMigration()) { + throw new UnsupportedOperationException("The new namespace serializer requires state migration in order for the job to proceed." + + " However, migration for state namespace currently isn't supported."); + } + + if (stateCompatibility.isRequiresMigration()) { + migrateStateValue(stateDesc, stateInfo, restoredMetaInfoSnapshot, newMetaInfo); + } else { + newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + newMetaInfo.getStat
[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226685552 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + stateDesc.getType() == restoredType, + "Incompatible key/value state types. " + + "Was [" + restoredType + "], " + + "registered with [" + stateDesc.getType() + "]."); + } + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer, + snapshotTransformer); + + CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + namespaceSerializer); + + CompatibilityResult stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + stateSerializer); + + if (namespaceCompatibility.isRequiresMigration()) { + throw new UnsupportedOperationException("The new namespace serializer requires state migration in order for the job to proceed." + + " However, migration for state namespace currently isn't supported."); + } + + if (stateCompatibility.isRequiresMigration()) { + migrateStateValue(stateDesc, stateInfo, restoredMetaInfoSnapshot, newMetaInfo); + } else { + newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + newMetaInfo.getStat
[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226683613 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ## @@ -154,6 +159,29 @@ public void setCurrentNamespace(N namespace) { return backend.db.get(columnFamily, tmpKeySerializationView.getCopyOfBuffer()); } + public byte[] migrateSerializedValue( + byte[] serializedOldValue, + TypeSerializer priorSerializer, + TypeSerializer newSerializer) throws StateMigrationException { + + try { + V value = priorSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(serializedOldValue))); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); Review comment: Better use `DataInputDeserializer` / `DataOutputSerializer` instead of wrapped byte-array streams. They are quiet a bit faster. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226669120 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + stateDesc.getType() == restoredType, + "Incompatible key/value state types. " + + "Was [" + restoredType + "], " + + "registered with [" + stateDesc.getType() + "]."); + } + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( Review comment: It seems that now both branches in the calling method `tryRegisterKvStateInformation` create a `newMetaInfo`. Why not always create it there and pass it as only argument to this function which can then transform it if needed? Could deduplicate and simplify some code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226676987 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ## @@ -269,10 +269,11 @@ public HeapKeyedStateBackend( "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + " but its corresponding restored snapshot cannot be found."); - newMetaInfo = RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility( - restoredMetaInfoSnapshot, + newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( Review comment: We can now move this outside the if and initialize `newMetaInfo` directly for all cases to reduce code duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226679606 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ## @@ -154,6 +159,29 @@ public void setCurrentNamespace(N namespace) { return backend.db.get(columnFamily, tmpKeySerializationView.getCopyOfBuffer()); } + public byte[] migrateSerializedValue( Review comment: I think this method does not belong here. This class is used to interact with the state during event processing, I suggest to keep that separated from migration concerns. If you are looking for a way to differentiate the logic for different states, you could also just use a factor that gives you different implementations of some `StateValueTransformer`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226688632 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + stateDesc.getType() == restoredType, + "Incompatible key/value state types. " + + "Was [" + restoredType + "], " + + "registered with [" + stateDesc.getType() + "]."); + } + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer, + snapshotTransformer); + + CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + namespaceSerializer); + + CompatibilityResult stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + stateSerializer); + + if (namespaceCompatibility.isRequiresMigration()) { + throw new UnsupportedOperationException("The new namespace serializer requires state migration in order for the job to proceed." + + " However, migration for state namespace currently isn't supported."); + } + + if (stateCompatibility.isRequiresMigration()) { + migrateStateValue(stateDesc, stateInfo, restoredMetaInfoSnapshot, newMetaInfo); + } else { + newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + newMetaInfo.getStat
[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226663119 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); Review comment: This line and the following check are already done by the caller in exactly the same way. Wouldn't it make more sense to then pass `StateMetaInfoSnapshot` as argument from the caller? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226691526 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( Review comment: Sometimes it can be a good idea to separate multiple precondition checks into a separate method so that it is easier to focus on the main logic of the function. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r22394 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) Review comment: I suggest to compare enums with `!=` instead of `!Objects.equals` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226680634 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + stateDesc.getType() == restoredType, + "Incompatible key/value state types. " + + "Was [" + restoredType + "], " + + "registered with [" + stateDesc.getType() + "]."); + } + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer, + snapshotTransformer); + + CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + namespaceSerializer); + + CompatibilityResult stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + stateSerializer); + + if (namespaceCompatibility.isRequiresMigration()) { + throw new UnsupportedOperationException("The new namespace serializer requires state migration in order for the job to proceed." + + " However, migration for state namespace currently isn't supported."); + } + + if (stateCompatibility.isRequiresMigration()) { + migrateStateValue(stateDesc, stateInfo, restoredMetaInfoSnapshot, newMetaInfo); + } else { + newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + newMetaInfo.getStat
[GitHub] StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends
StefanRRichter commented on a change in pull request #6875: [FLINK-9808] [state backends] Migrate state when necessary in state backends URL: https://github.com/apache/flink/pull/6875#discussion_r226689891 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ## @@ -1389,6 +1388,149 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyValueStateBackendMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2 stateInfo, + @Nullable StateSnapshotTransformer snapshotTransformer) throws Exception { + + StateMetaInfoSnapshot restoredMetaInfoSnapshot = restoredKvStateMetaInfos.get(stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + Preconditions.checkState(restoredMetaInfoSnapshot.getBackendStateType() + == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE, + "Incompatible state types. " + + "Was [" + restoredMetaInfoSnapshot.getBackendStateType() + "], " + + "registered as [" + StateMetaInfoSnapshot.BackendStateType.KEY_VALUE + "]."); + + Preconditions.checkState( + Objects.equals(stateDesc.getName(), restoredMetaInfoSnapshot.getName()), + "Incompatible state names. " + + "Was [" + restoredMetaInfoSnapshot.getName() + "], " + + "registered with [" + stateDesc.getName() + "]."); + + final StateDescriptor.Type restoredType = + StateDescriptor.Type.valueOf( + restoredMetaInfoSnapshot.getOption( + StateMetaInfoSnapshot.CommonOptionsKeys.KEYED_STATE_TYPE)); + + if (!Objects.equals(stateDesc.getType(), StateDescriptor.Type.UNKNOWN) + && !Objects.equals(restoredType, StateDescriptor.Type.UNKNOWN)) { + + Preconditions.checkState( + stateDesc.getType() == restoredType, + "Incompatible key/value state types. " + + "Was [" + restoredType + "], " + + "registered with [" + stateDesc.getType() + "]."); + } + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyValueStateBackendMetaInfo newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer, + snapshotTransformer); + + CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()), + namespaceSerializer); + + CompatibilityResult stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + null, + restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()), + stateSerializer); + + if (namespaceCompatibility.isRequiresMigration()) { + throw new UnsupportedOperationException("The new namespace serializer requires state migration in order for the job to proceed." + + " However, migration for state namespace currently isn't supported."); + } + + if (stateCompatibility.isRequiresMigration()) { + migrateStateValue(stateDesc, stateInfo, restoredMetaInfoSnapshot, newMetaInfo); + } else { + newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( + newMetaInfo.getStat
[jira] [Created] (FLINK-10616) Jepsen test fails while tearing down Hadoop
Gary Yao created FLINK-10616: Summary: Jepsen test fails while tearing down Hadoop Key: FLINK-10616 URL: https://issues.apache.org/jira/browse/FLINK-10616 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.6.1, 1.7.0 Reporter: Gary Yao While tearing down Hadoop, the test fails with the exception below: {noformat} Caused by: java.lang.RuntimeException: sudo -S -u root bash -c "cd /; ps aux | grep hadoop | grep -v grep | awk \"\{print \\\$2}\" | xargs kill -9" returned non-zero exit status 123 on 172.31.39.235. STDOUT: STDERR: at jepsen.control$throw_on_nonzero_exit.invokeStatic(control.clj:129) ~[jepsen-0.1.10.jar:na] at jepsen.control$throw_on_nonzero_exit.invoke(control.clj:122) ~[jepsen-0.1.10.jar:na] at jepsen.control$exec_STAR_.invokeStatic(control.clj:166) ~[jepsen-0.1.10.jar:na] at jepsen.control$exec_STAR_.doInvoke(control.clj:163) ~[jepsen-0.1.10.jar:na] at clojure.lang.RestFn.applyTo(RestFn.java:137) [clojure-1.9.0.jar:na] at clojure.core$apply.invokeStatic(core.clj:657) ~[clojure-1.9.0.jar:na] at clojure.core$apply.invoke(core.clj:652) ~[clojure-1.9.0.jar:na] at jepsen.control$exec.invokeStatic(control.clj:182) ~[jepsen-0.1.10.jar:na] at jepsen.control$exec.doInvoke(control.clj:176) ~[jepsen-0.1.10.jar:na] at clojure.lang.RestFn.invoke(RestFn.java:2088) [clojure-1.9.0.jar:na] at jepsen.control.util$grepkill_BANG_.invokeStatic(util.clj:197) ~[classes/:na] at jepsen.control.util$grepkill_BANG_.invoke(util.clj:191) ~[classes/:na] at jepsen.control.util$grepkill_BANG_.invokeStatic(util.clj:194) ~[classes/:na] at jepsen.control.util$grepkill_BANG_.invoke(util.clj:191) ~[classes/:na] at jepsen.flink.hadoop$db$reify__3102.teardown_BANG_(hadoop.clj:128) ~[classes/:na] at jepsen.flink.db$combined_db$reify__217$fn__220.invoke(db.clj:119) ~[na:na] at clojure.core$map$fn__5587.invoke(core.clj:2745) ~[clojure-1.9.0.jar:na] at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.9.0.jar:na] at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.9.0.jar:na] at clojure.lang.RT.seq(RT.java:528) ~[clojure-1.9.0.jar:na] at clojure.core$seq__5124.invokeStatic(core.clj:137) ~[clojure-1.9.0.jar:na] at clojure.core$dorun.invokeStatic(core.clj:3125) ~[clojure-1.9.0.jar:na] at clojure.core$doall.invokeStatic(core.clj:3140) ~[clojure-1.9.0.jar:na] at clojure.core$doall.invoke(core.clj:3140) ~[clojure-1.9.0.jar:na] at jepsen.flink.db$combined_db$reify__217.teardown_BANG_(db.clj:119) ~[na:na] at jepsen.db$fn__2137$G__2133__2141.invoke(db.clj:8) ~[jepsen-0.1.10.jar:na] at jepsen.db$fn__2137$G__2132__2146.invoke(db.clj:8) ~[jepsen-0.1.10.jar:na] at clojure.core$partial$fn__5561.invoke(core.clj:2617) ~[clojure-1.9.0.jar:na] at jepsen.control$on_nodes$fn__2116.invoke(control.clj:372) ~[jepsen-0.1.10.jar:na] at clojure.lang.AFn.applyToHelper(AFn.java:154) ~[clojure-1.9.0.jar:na] at clojure.lang.AFn.applyTo(AFn.java:144) ~[clojure-1.9.0.jar:na] at clojure.core$apply.invokeStatic(core.clj:657) ~[clojure-1.9.0.jar:na] at clojure.core$with_bindings_STAR_.invokeStatic(core.clj:1965) ~[clojure-1.9.0.jar:na] at clojure.core$with_bindings_STAR_.doInvoke(core.clj:1965) ~[clojure-1.9.0.jar:na] at clojure.lang.RestFn.applyTo(RestFn.java:142) [clojure-1.9.0.jar:na] at clojure.core$apply.invokeStatic(core.clj:661) ~[clojure-1.9.0.jar:na] at clojure.core$bound_fn_STAR_$fn__5471.doInvoke(core.clj:1995) ~[clojure-1.9.0.jar:na] at clojure.lang.RestFn.invoke(RestFn.java:408) [clojure-1.9.0.jar:na] at jepsen.util$real_pmap$launcher__1168$fn__1169.invoke(util.clj:49) ~[jepsen-0.1.10.jar:na] at clojure.core$binding_conveyor_fn$fn__5476.invoke(core.clj:2022) ~[clojure-1.9.0.jar:na] at clojure.lang.AFn.call(AFn.java:18) ~[clojure-1.9.0.jar:na] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_171] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_171] at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_171] {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6886: [BP-1.5][FLINK-10614] Let test_batch_allround.sh rely on test-runner-common.sh for clean up
tillrohrmann commented on a change in pull request #6886: [BP-1.5][FLINK-10614] Let test_batch_allround.sh rely on test-runner-common.sh for clean up URL: https://github.com/apache/flink/pull/6886#discussion_r226690899 ## File path: flink-end-to-end-tests/test-scripts/test_batch_allround.sh ## @@ -24,31 +24,13 @@ TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAll echo "Run DataSet-Allround-Test Program" # modify configuration to include spilling to disk -cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak echo "taskmanager.network.memory.min: 10485760" >> $FLINK_DIR/conf/flink-conf.yaml echo "taskmanager.network.memory.max: 10485760" >> $FLINK_DIR/conf/flink-conf.yaml backup_config Review comment: You're right, this command must be moved above the two `echo` lines. The config will be reverted by the `test-runner-common.sh`. With FLINK-10094 the backup moved to this script as well but it is not part of `release-1.5`. I will move this command before the two echos. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-5601) Window operator does not checkpoint watermarks
[ https://issues.apache.org/jira/browse/FLINK-5601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-5601: - Affects Version/s: 1.7.0 > Window operator does not checkpoint watermarks > -- > > Key: FLINK-5601 > URL: https://issues.apache.org/jira/browse/FLINK-5601 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Ufuk Celebi >Assignee: Jiayi Liao >Priority: Critical > Fix For: 1.8.0 > > > During release testing [~stefanrichte...@gmail.com] and I noticed that > watermarks are not checkpointed in the window operator. > This can lead to non determinism when restoring checkpoints. I was running an > adjusted {{SessionWindowITCase}} via Kafka for testing migration and > rescaling and ran into failures, because the data generator required > determinisitic behaviour. > What happened was that on restore it could happen that late elements were not > dropped, because the watermarks needed to be re-established after restore > first. > [~aljoscha] Do you know whether there is a special reason for explicitly not > checkpointing watermarks? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-5601) Window operator does not checkpoint watermarks
[ https://issues.apache.org/jira/browse/FLINK-5601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-5601: - Fix Version/s: (was: 1.7.0) 1.8.0 > Window operator does not checkpoint watermarks > -- > > Key: FLINK-5601 > URL: https://issues.apache.org/jira/browse/FLINK-5601 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Ufuk Celebi >Assignee: Jiayi Liao >Priority: Critical > Fix For: 1.8.0 > > > During release testing [~stefanrichte...@gmail.com] and I noticed that > watermarks are not checkpointed in the window operator. > This can lead to non determinism when restoring checkpoints. I was running an > adjusted {{SessionWindowITCase}} via Kafka for testing migration and > rescaling and ran into failures, because the data generator required > determinisitic behaviour. > What happened was that on restore it could happen that late elements were not > dropped, because the watermarks needed to be re-established after restore > first. > [~aljoscha] Do you know whether there is a special reason for explicitly not > checkpointing watermarks? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6856) Eagerly check if serializer config snapshots are deserializable when snapshotting
[ https://issues.apache.org/jira/browse/FLINK-6856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656930#comment-16656930 ] Till Rohrmann commented on FLINK-6856: -- What's the state of this issue [~tzulitai]? Is it still valid? If so, shall we make it a subtask of FLINK-9376. > Eagerly check if serializer config snapshots are deserializable when > snapshotting > - > > Key: FLINK-6856 > URL: https://issues.apache.org/jira/browse/FLINK-6856 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.3.0, 1.4.2, 1.5.1, 1.6.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.7.0 > > > Currently, if serializer config snapshots are not deserializable (for > example, if the user did not correctly include the deserialization empty > constructor, or the read / write methods are simply wrongly implemented), > user's would only be able to find out this when restoring from the snapshot. > We could eagerly do a check for this when snapshotting, and fail with a good > message indicating that the config snapshot can not be deserialized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-7915) Verify functionality of RollingSinkSecuredITCase
[ https://issues.apache.org/jira/browse/FLINK-7915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7915. -- Resolution: Won't Do Fix Version/s: (was: 1.5.6) (was: 1.6.3) (was: 1.7.0) Won't do since {{RollingSink}} is deprecated. > Verify functionality of RollingSinkSecuredITCase > > > Key: FLINK-7915 > URL: https://issues.apache.org/jira/browse/FLINK-7915 > Project: Flink > Issue Type: Task > Components: Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > > I recently stumbled across the test {{RollingSinkSecuredITCase}} which will > only be executed for Hadoop version {{>= 3}}. When trying to run it from > IntelliJ I immediately run into a class not found exception for > {{jdbm.helpers.CachePolicy}} and even after fixing this problem, the test > would not run because it complained about wrong security settings. > I think we should check whether this test is at all working and if not, then > we should remove or replace it with something working. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8871) Checkpoint cancellation is not propagated to stop checkpointing threads on the task manager
[ https://issues.apache.org/jira/browse/FLINK-8871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-8871: - Fix Version/s: (was: 1.7.0) 1.8.0 > Checkpoint cancellation is not propagated to stop checkpointing threads on > the task manager > --- > > Key: FLINK-8871 > URL: https://issues.apache.org/jira/browse/FLINK-8871 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2, 1.4.1, 1.5.0, 1.6.0, 1.7.0 >Reporter: Stefan Richter >Priority: Critical > Fix For: 1.8.0 > > > Flink currently lacks any form of feedback mechanism from the job manager / > checkpoint coordinator to the tasks when it comes to failing a checkpoint. > This means that running snapshots on the tasks are also not stopped even if > their owning checkpoint is already cancelled. Two examples for cases where > this applies are checkpoint timeouts and local checkpoint failures on a task > together with a configuration that does not fail tasks on checkpoint failure. > Notice that those running snapshots do no longer account for the maximum > number of parallel checkpoints, because their owning checkpoint is considered > as cancelled. > Not stopping the task's snapshot thread can lead to a problematic situation > where the next checkpoints already started, while the abandoned checkpoint > thread from a previous checkpoint is still lingering around running. This > scenario can potentially cascade: many parallel checkpoints will slow down > checkpointing and make timeouts even more likely. > > A possible solution is introducing a {{cancelCheckpoint}} method as > counterpart to the {{triggerCheckpoint}} method in the task manager gateway, > which is invoked by the checkpoint coordinator as part of cancelling the > checkpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8871) Checkpoint cancellation is not propagated to stop checkpointing threads on the task manager
[ https://issues.apache.org/jira/browse/FLINK-8871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-8871: - Affects Version/s: 1.7.0 > Checkpoint cancellation is not propagated to stop checkpointing threads on > the task manager > --- > > Key: FLINK-8871 > URL: https://issues.apache.org/jira/browse/FLINK-8871 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2, 1.4.1, 1.5.0, 1.6.0, 1.7.0 >Reporter: Stefan Richter >Priority: Critical > Fix For: 1.8.0 > > > Flink currently lacks any form of feedback mechanism from the job manager / > checkpoint coordinator to the tasks when it comes to failing a checkpoint. > This means that running snapshots on the tasks are also not stopped even if > their owning checkpoint is already cancelled. Two examples for cases where > this applies are checkpoint timeouts and local checkpoint failures on a task > together with a configuration that does not fail tasks on checkpoint failure. > Notice that those running snapshots do no longer account for the maximum > number of parallel checkpoints, because their owning checkpoint is considered > as cancelled. > Not stopping the task's snapshot thread can lead to a problematic situation > where the next checkpoints already started, while the abandoned checkpoint > thread from a previous checkpoint is still lingering around running. This > scenario can potentially cascade: many parallel checkpoints will slow down > checkpointing and make timeouts even more likely. > > A possible solution is introducing a {{cancelCheckpoint}} method as > counterpart to the {{triggerCheckpoint}} method in the task manager gateway, > which is invoked by the checkpoint coordinator as part of cancelling the > checkpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9002) Add operators with input type that goes through Avro serialization (with schema/generic)
[ https://issues.apache.org/jira/browse/FLINK-9002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-9002: - Fix Version/s: 1.7.0 > Add operators with input type that goes through Avro serialization (with > schema/generic) > - > > Key: FLINK-9002 > URL: https://issues.apache.org/jira/browse/FLINK-9002 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Priority: Major > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8997) Add sliding window aggregation to the job
[ https://issues.apache.org/jira/browse/FLINK-8997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-8997: - Fix Version/s: 1.7.0 > Add sliding window aggregation to the job > - > > Key: FLINK-8997 > URL: https://issues.apache.org/jira/browse/FLINK-8997 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Priority: Major > Fix For: 1.7.0 > > > The test job should also test windowing. Sliding windows are probably the > most demanding form, so this would be a good pick for the test. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8998) Add user-defined watermark assigner and timestamp extractor to the test job
[ https://issues.apache.org/jira/browse/FLINK-8998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-8998: - Fix Version/s: 1.7.0 > Add user-defined watermark assigner and timestamp extractor to the test job > --- > > Key: FLINK-8998 > URL: https://issues.apache.org/jira/browse/FLINK-8998 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Priority: Major > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9003) Add operators with input type that goes through custom, stateful serialization
[ https://issues.apache.org/jira/browse/FLINK-9003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-9003: - Fix Version/s: 1.7.0 > Add operators with input type that goes through custom, stateful serialization > -- > > Key: FLINK-9003 > URL: https://issues.apache.org/jira/browse/FLINK-9003 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Priority: Major > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9001) Add operators with input type that goes through Kryo serialization (registered/generic/custom)
[ https://issues.apache.org/jira/browse/FLINK-9001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-9001: - Fix Version/s: 1.7.0 > Add operators with input type that goes through Kryo serialization > (registered/generic/custom) > --- > > Key: FLINK-9001 > URL: https://issues.apache.org/jira/browse/FLINK-9001 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Priority: Major > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8995) Add a test operator with keyed state that uses custom, stateful serializer
[ https://issues.apache.org/jira/browse/FLINK-8995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-8995: - Fix Version/s: 1.7.0 > Add a test operator with keyed state that uses custom, stateful serializer > -- > > Key: FLINK-8995 > URL: https://issues.apache.org/jira/browse/FLINK-8995 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.5.0 >Reporter: Stefan Richter >Priority: Major > Fix For: 1.7.0 > > > This test should figure out problems in places where multiple threads would > share the same serializer instead of properly duplicating it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-9410: - Affects Version/s: 1.7.0 > Replace NMClient with NMClientAsync in YarnResourceManager > -- > > Key: FLINK-9410 > URL: https://issues.apache.org/jira/browse/FLINK-9410 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: zhangminglei >Priority: Critical > Fix For: 1.8.0 > > > Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} > which is called from within the main thread of the {{ResourceManager}}. Since > these operations are blocking, we should replace the client with the > {{NMClientAsync}} and make the calls non blocking. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-10543) Leverage efficient timer deletion in relational operators
[ https://issues.apache.org/jira/browse/FLINK-10543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng reassigned FLINK-10543: --- Assignee: Hequn Cheng > Leverage efficient timer deletion in relational operators > - > > Key: FLINK-10543 > URL: https://issues.apache.org/jira/browse/FLINK-10543 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Hequn Cheng >Priority: Minor > > FLINK-9423 added support for efficient timer deletions. This feature is > available since Flink 1.6 and should be used by the relational operator of > SQL and Table API. > Currently, we use a few workarounds to handle situations when deleting timers > would be the better solution. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9410) Replace NMClient with NMClientAsync in YarnResourceManager
[ https://issues.apache.org/jira/browse/FLINK-9410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-9410: - Fix Version/s: (was: 1.7.0) 1.8.0 > Replace NMClient with NMClientAsync in YarnResourceManager > -- > > Key: FLINK-9410 > URL: https://issues.apache.org/jira/browse/FLINK-9410 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: zhangminglei >Priority: Critical > Fix For: 1.8.0 > > > Currently, the {{YarnResourceManager}} uses the synchronous {{NMClient}} > which is called from within the main thread of the {{ResourceManager}}. Since > these operations are blocking, we should replace the client with the > {{NMClientAsync}} and make the calls non blocking. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-9587) ContinuousFileMonitoringFunction crashes on short living files
[ https://issues.apache.org/jira/browse/FLINK-9587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-9587. -- Resolution: Not A Problem > ContinuousFileMonitoringFunction crashes on short living files > -- > > Key: FLINK-9587 > URL: https://issues.apache.org/jira/browse/FLINK-9587 > Project: Flink > Issue Type: Bug > Components: FileSystem, Streaming, Streaming Connectors >Affects Versions: 1.5.0 > Environment: Flink 1.5 running as a standalone cluster. >Reporter: Andrei Shumanski >Priority: Critical > Fix For: 1.6.3, 1.7.0 > > > Hi, > > We use Flink to monitor a directory for new files. The filesystem is a MapR > Fuse mount that looks like a local FS. > The files are copied to the directory by another process that uses rsync > command. While a file is not completely written rsync creates a temporary > file with a name like ".file.txt.uM6MfZ" where the last extension is a random > string. > When the copying is done - file is renamed to the final name "file.txt". > > The bug is that Flink does not correctly handle this behavior and does not > take into account that files in the directory might be deleted. > > We are getting error traces: > {code:java} > java.io.FileNotFoundException: File > file:/mapr/landingarea/cId=2075/.file_00231.cpio.gz.uM6MfZ does not exist or > the user running Flink ('root') has insufficient permissions to access it. > at > org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115) > at > org.apache.flink.core.fs.local.LocalFileSystem.listStatus(LocalFileSystem.java:177) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:92) > at > org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:707) > at > org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710) > at > org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710) > at > org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710) > at > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:591) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonitoringFunction.java:270) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:242) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:206) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} > In LocalFileSystem.listStatus(final Path f) we read the list of files in a > directory and then create LocalFileStatus object for each of the files. But a > file might be removed during the interval between these operations. > I do not see any option to handle this exception in our code. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9587) ContinuousFileMonitoringFunction crashes on short living files
[ https://issues.apache.org/jira/browse/FLINK-9587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656921#comment-16656921 ] Till Rohrmann commented on FLINK-9587: -- I think we can close this issue due to inactivity. > ContinuousFileMonitoringFunction crashes on short living files > -- > > Key: FLINK-9587 > URL: https://issues.apache.org/jira/browse/FLINK-9587 > Project: Flink > Issue Type: Bug > Components: FileSystem, Streaming, Streaming Connectors >Affects Versions: 1.5.0 > Environment: Flink 1.5 running as a standalone cluster. >Reporter: Andrei Shumanski >Priority: Critical > Fix For: 1.6.3, 1.7.0 > > > Hi, > > We use Flink to monitor a directory for new files. The filesystem is a MapR > Fuse mount that looks like a local FS. > The files are copied to the directory by another process that uses rsync > command. While a file is not completely written rsync creates a temporary > file with a name like ".file.txt.uM6MfZ" where the last extension is a random > string. > When the copying is done - file is renamed to the final name "file.txt". > > The bug is that Flink does not correctly handle this behavior and does not > take into account that files in the directory might be deleted. > > We are getting error traces: > {code:java} > java.io.FileNotFoundException: File > file:/mapr/landingarea/cId=2075/.file_00231.cpio.gz.uM6MfZ does not exist or > the user running Flink ('root') has insufficient permissions to access it. > at > org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:115) > at > org.apache.flink.core.fs.local.LocalFileSystem.listStatus(LocalFileSystem.java:177) > at > org.apache.flink.core.fs.SafetyNetWrapperFileSystem.listStatus(SafetyNetWrapperFileSystem.java:92) > at > org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:707) > at > org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710) > at > org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710) > at > org.apache.flink.api.common.io.FileInputFormat.addFilesInDir(FileInputFormat.java:710) > at > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:591) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.getInputSplitsSortedByModTime(ContinuousFileMonitoringFunction.java:270) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.monitorDirAndForwardSplits(ContinuousFileMonitoringFunction.java:242) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:206) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > {code} > In LocalFileSystem.listStatus(final Path f) we read the list of files in a > directory and then create LocalFileStatus object for each of the files. But a > file might be removed during the interval between these operations. > I do not see any option to handle this exception in our code. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10368) 'Kerberized YARN on Docker test' instable
[ https://issues.apache.org/jira/browse/FLINK-10368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10368: -- Fix Version/s: 1.6.3 1.5.6 > 'Kerberized YARN on Docker test' instable > - > > Key: FLINK-10368 > URL: https://issues.apache.org/jira/browse/FLINK-10368 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Running Kerberized YARN on Docker test end-to-end test failed on an AWS > instance. The problem seems to be that the NameNode went into safe-mode due > to limited resources. > {code} > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/home/hadoop-user/flink-1.6.1/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/usr/local/hadoop-2.8.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2018-09-19 09:04:39,201 INFO org.apache.hadoop.security.UserGroupInformation > - Login successful for user hadoop-user using keytab file > /home/hadoop-user/hadoop-user.keytab > 2018-09-19 09:04:39,453 INFO org.apache.hadoop.yarn.client.RMProxy > - Connecting to ResourceManager at > master.docker-hadoop-cluster-network/172.22.0.3:8032 > 2018-09-19 09:04:39,640 INFO org.apache.hadoop.yarn.client.AHSProxy > - Connecting to Application History server at > master.docker-hadoop-cluster-network/172.22.0.3:10200 > 2018-09-19 09:04:39,656 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli > - No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-09-19 09:04:39,656 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli > - No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-09-19 09:04:39,901 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster > specification: ClusterSpecification{masterMemoryMB=2000, > taskManagerMemoryMB=2000, numberTaskManagers=3, slotsPerTaskManager=1} > 2018-09-19 09:04:40,286 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - The > configuration directory ('/home/hadoop-user/flink-1.6.1/conf') contains both > LOG4J and Logback configuration files. Please delete or rename one of them. > > The program finished with the following exception: > org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't > deploy Yarn session cluster > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:420) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:259) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120) > Caused by: org.apache.hadoop.hdfs.server.namenode.SafeModeException: Cannot > create > file/user/hadoop-user/.flink/application_1537266361291_0099/lib/slf4j-log4j12-1.7.7.jar. > Name node is in safe mode. > Resources are low on NN. Please add or free up more resources then turn off > safe mode manually. NOTE: If you turn off safe mode before adding resources, > the NN will immediately return to safe mode. Use "hdfs dfsadmin -safemode > leave" to turn safe mode off. > NamenodeHostName:master.docker-hadoop-cluster-network > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.newSafemodeException(FSNamesystem.java:1407) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkNameNodeSafeMode(FSNamesystem.java:1395) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2278) > at > org.apache.hado
[jira] [Commented] (FLINK-10364) Test instability in NonHAQueryableStateFsBackendITCase#testMapState
[ https://issues.apache.org/jira/browse/FLINK-10364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656917#comment-16656917 ] Till Rohrmann commented on FLINK-10364: --- Ping [~kkl0u] > Test instability in NonHAQueryableStateFsBackendITCase#testMapState > --- > > Key: FLINK-10364 > URL: https://issues.apache.org/jira/browse/FLINK-10364 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.5.4, 1.6.0 >Reporter: Nico Kruber >Priority: Critical > Labels: test-stability > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > {code} > Tests run: 12, Failures: 0, Errors: 1, Skipped: 1, Time elapsed: 14.365 sec > <<< FAILURE! - in > org.apache.flink.queryablestate.itcases.NonHAQueryableStateFsBackendITCase > testMapState(org.apache.flink.queryablestate.itcases.NonHAQueryableStateFsBackendITCase) > Time elapsed: 0.253 sec <<< ERROR! > java.lang.NullPointerException: null > at > org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase.testMapState(AbstractQueryableStateTestBase.java:840) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) > {code} > from https://api.travis-ci.org/v3/job/429797943/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10491) Deadlock during spilling data in SpillableSubpartition
[ https://issues.apache.org/jira/browse/FLINK-10491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656916#comment-16656916 ] Till Rohrmann commented on FLINK-10491: --- [~pnowojski] [~zjwang], is this something we want to change for Flink 1.7? > Deadlock during spilling data in SpillableSubpartition > --- > > Key: FLINK-10491 > URL: https://issues.apache.org/jira/browse/FLINK-10491 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.4, 1.6.1 >Reporter: Piotr Nowojski >Assignee: zhijiang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Originally reported here: > [https://lists.apache.org/thread.html/472c8f4a2711c5e217fadd9a88f8c73670218e7432bb81ba3f5076db@%3Cuser.flink.apache.org%3E] > Thread dump (from 1.5.3 version) showing two deadlocked threads, because they > are taking two locks in different order: > {noformat} > Thread-1 > "DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA > waiting for monitor entry > waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a > java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355) > - locked <0x2dfd> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203) > - locked <0x2da5> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507) > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213) > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) > at > org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:745) > Thread-2 > "Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor > entry > java.lang.Thread.State: BLOCKED > blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 > waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release > lock on <0x2dfd> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171) > at > org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106) > at > org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146) > at > org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110) > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271) > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117) > - locked <0x2dfb> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96) > - locked <0x2dfc> (a > org.apache.flink.runtime.io.network.partition.SpillableSubpartition) > at > org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNew
[jira] [Updated] (FLINK-10481) Wordcount end-to-end test in docker env unstable
[ https://issues.apache.org/jira/browse/FLINK-10481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10481: -- Fix Version/s: 1.5.6 > Wordcount end-to-end test in docker env unstable > > > Key: FLINK-10481 > URL: https://issues.apache.org/jira/browse/FLINK-10481 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > The {{Wordcount end-to-end test in docker env}} fails sometimes on Travis > with the following problem: > {code} > Status: Downloaded newer image for java:8-jre-alpine > ---> fdc893b19a14 > Step 2/16 : RUN apk add --no-cache bash snappy > ---> [Warning] IPv4 forwarding is disabled. Networking will not work. > ---> Running in 4329ebcd8a77 > fetch http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz > WARNING: Ignoring > http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz: > temporary error (try again later) > fetch > http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz > WARNING: Ignoring > http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz: > temporary error (try again later) > ERROR: unsatisfiable constraints: > bash (missing): > required by: world[bash] > snappy (missing): > required by: world[snappy] > The command '/bin/sh -c apk add --no-cache bash snappy' returned a non-zero > code: 2 > {code} > https://api.travis-ci.org/v3/job/434909395/log.txt > It seems as if it is related to > https://github.com/gliderlabs/docker-alpine/issues/264 and > https://github.com/gliderlabs/docker-alpine/issues/279. > We might want to switch to a different base image to avoid these problems in > the future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10481) Wordcount end-to-end test in docker env unstable
[ https://issues.apache.org/jira/browse/FLINK-10481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-10481: -- Fix Version/s: 1.6.3 > Wordcount end-to-end test in docker env unstable > > > Key: FLINK-10481 > URL: https://issues.apache.org/jira/browse/FLINK-10481 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > The {{Wordcount end-to-end test in docker env}} fails sometimes on Travis > with the following problem: > {code} > Status: Downloaded newer image for java:8-jre-alpine > ---> fdc893b19a14 > Step 2/16 : RUN apk add --no-cache bash snappy > ---> [Warning] IPv4 forwarding is disabled. Networking will not work. > ---> Running in 4329ebcd8a77 > fetch http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz > WARNING: Ignoring > http://dl-cdn.alpinelinux.org/alpine/v3.4/main/x86_64/APKINDEX.tar.gz: > temporary error (try again later) > fetch > http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz > WARNING: Ignoring > http://dl-cdn.alpinelinux.org/alpine/v3.4/community/x86_64/APKINDEX.tar.gz: > temporary error (try again later) > ERROR: unsatisfiable constraints: > bash (missing): > required by: world[bash] > snappy (missing): > required by: world[snappy] > The command '/bin/sh -c apk add --no-cache bash snappy' returned a non-zero > code: 2 > {code} > https://api.travis-ci.org/v3/job/434909395/log.txt > It seems as if it is related to > https://github.com/gliderlabs/docker-alpine/issues/264 and > https://github.com/gliderlabs/docker-alpine/issues/279. > We might want to switch to a different base image to avoid these problems in > the future. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10368) 'Kerberized YARN on Docker test' instable
[ https://issues.apache.org/jira/browse/FLINK-10368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656901#comment-16656901 ] Till Rohrmann commented on FLINK-10368: --- I encountered another problem: {code}The program finished with the following exception: org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:419) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:261) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1035) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:) Caused by: org.apache.flink.configuration.IllegalConfigurationException: The number of requested virtual cores per node 1 exceeds the maximum number of virtual cores 0 available in the Yarn Cluster. Please note that the number of virtual cores is set to the number of task slots by default unless configured in the Flink config with 'yarn.containers.vcores.' at org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:299) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:491) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:412) ... 9 more {code} Apparently, the cluster thinks that it has only {{0}} vcores. This might be a setup issue. > 'Kerberized YARN on Docker test' instable > - > > Key: FLINK-10368 > URL: https://issues.apache.org/jira/browse/FLINK-10368 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Priority: Critical > Labels: test-stability > Fix For: 1.7.0 > > > Running Kerberized YARN on Docker test end-to-end test failed on an AWS > instance. The problem seems to be that the NameNode went into safe-mode due > to limited resources. > {code} > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/home/hadoop-user/flink-1.6.1/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/usr/local/hadoop-2.8.4/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2018-09-19 09:04:39,201 INFO org.apache.hadoop.security.UserGroupInformation > - Login successful for user hadoop-user using keytab file > /home/hadoop-user/hadoop-user.keytab > 2018-09-19 09:04:39,453 INFO org.apache.hadoop.yarn.client.RMProxy > - Connecting to ResourceManager at > master.docker-hadoop-cluster-network/172.22.0.3:8032 > 2018-09-19 09:04:39,640 INFO org.apache.hadoop.yarn.client.AHSProxy > - Connecting to Application History server at > master.docker-hadoop-cluster-network/172.22.0.3:10200 > 2018-09-19 09:04:39,656 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli > - No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-09-19 09:04:39,656 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli > - No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2018-09-19 09:04:39,901 INFO > org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster > specification: ClusterSpecification{masterMemoryMB=2000, > taskManagerMemoryMB=2000, numberTaskManagers=3, slotsPerTaskManager=1} > 2018-09-19 09:04:40,286 WARN > org.apache.flink.yarn.AbstractYarnClusterDescriptor - The > configuration directory ('/home/hadoop-user/flink-1.6.1/conf') contains both > LOG4J and Logback configuration files. Please delete or rename one of them. > > The program finished with the following exception: > o
[jira] [Commented] (FLINK-10482) java.lang.IllegalArgumentException: Negative number of in progress checkpoints
[ https://issues.apache.org/jira/browse/FLINK-10482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656892#comment-16656892 ] Gary Yao commented on FLINK-10482: -- While working on FLINK-10309, I discovered FLINK-10615, which can also cause the issue described here. If I recall correctly, the reason is that a pending checkpoint may be [aborted multiple times|https://github.com/apache/flink/blob/d2480af40f5145d865196a95ec58a415482d8cff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java#L512], which messes up the statistics. > java.lang.IllegalArgumentException: Negative number of in progress checkpoints > -- > > Key: FLINK-10482 > URL: https://issues.apache.org/jira/browse/FLINK-10482 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.6.1 >Reporter: Julio Biason >Priority: Major > Fix For: 1.8.0 > > > Recently I found the following log on my JobManager log: > {noformat} > 2018-10-02 17:44:50,090 [flink-akka.actor.default-dispatcher-4117] ERROR > org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Implementation > error: Unhandled exception. > java.lang.IllegalArgumentException: Negative number of in progress > checkpoints > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139) > at > org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.(CheckpointStatsCounts.java:72) > at > org.apache.flink.runtime.checkpoint.CheckpointStatsCounts.createSnapshot(CheckpointStatsCounts.java:177) > at > org.apache.flink.runtime.checkpoint.CheckpointStatsTracker.createSnapshot(CheckpointStatsTracker.java:166) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.getCheckpointStatsSnapshot(ExecutionGraph.java:553) > at > org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph.createFrom(ArchivedExecutionGraph.java:340) > at > org.apache.flink.runtime.jobmaster.JobMaster.requestJob(JobMaster.java:923) > at sun.reflect.GeneratedMethodAccessor101.invoke(Unknown Source) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) > > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > > > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > > > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > > > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > > > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > > > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > > > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >
[jira] [Commented] (FLINK-10601) Make user home dir consistent with Flink default filesystem
[ https://issues.apache.org/jira/browse/FLINK-10601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656883#comment-16656883 ] ASF GitHub Bot commented on FLINK-10601: link3280 commented on issue #6887: [FLINK-10601] [YARN] Make user home dir consistent with Flink default filesystem URL: https://github.com/apache/flink/pull/6887#issuecomment-431389044 I'd like to add some tests, but it seems that it's hard to expose the state of YARN containers in a UT, and writing an IT for this should be unworthy. I might need some suggestions on this. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make user home dir consistent with Flink default filesystem > --- > > Key: FLINK-10601 > URL: https://issues.apache.org/jira/browse/FLINK-10601 > Project: Flink > Issue Type: Improvement > Components: YARN >Affects Versions: 1.6.1 >Reporter: Paul Lin >Priority: Major > Labels: pull-request-available > > Currently, Flink client uses the raw default Hadoop filesystem to upload > local files, and this could be problematic when using a non-default > filesystem for HA or checkpointing in delegation tokens scenarios. The > jobmanager only has the delegation tokens for the default FS, so it gets > authentication errors when trying to connect a non-default filesystem. > So I propose to replace the default FS property in yarn configuration with > the Flink filesystem property `fs.default-scheme` on the client side > (AbstractYarnClusterDescriptor), to avoid this problem and also make the > client behavior more configurable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] link3280 commented on issue #6887: [FLINK-10601] [YARN] Make user home dir consistent with Flink default filesystem
link3280 commented on issue #6887: [FLINK-10601] [YARN] Make user home dir consistent with Flink default filesystem URL: https://github.com/apache/flink/pull/6887#issuecomment-431389044 I'd like to add some tests, but it seems that it's hard to expose the state of YARN containers in a UT, and writing an IT for this should be unworthy. I might need some suggestions on this. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10601) Make user home dir consistent with Flink default filesystem
[ https://issues.apache.org/jira/browse/FLINK-10601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656881#comment-16656881 ] ASF GitHub Bot commented on FLINK-10601: link3280 opened a new pull request #6887: [FLINK-10601] [YARN] Make user home dir consistent with Flink default filesystem URL: https://github.com/apache/flink/pull/6887 ## What is the purpose of the change Currently, Flink client uses the raw default Hadoop filesystem to upload local files, and this could be problematic when using a non-default filesystem for HA or checkpointing in delegation tokens scenarios. The jobmanager only has the delegation tokens for the default FS, so it gets authentication errors when trying to connect a non-default filesystem. So I propose to make the default FS consistent with the Flink filesystem property fs.default-scheme (if it's set) on the client side (AbstractYarnClusterDescriptor), to avoid this problem and also make the client behavior more configurable. ## Brief change log - Replace the fs.defaultFS property in the yarn configuration with the initiated Flink filesystem's default scheme if it's not local. ## Verifying this change This change added tests and can be verified as follows: - Set the default fs in core.xml to a non-exist fs, and set the fs.default-scheme property to a valid one in flink-conf.yaml, then run the WordCount example to see if it works well. There would be errors while the client tries to upload the files to the remote system if things go wrong. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make user home dir consistent with Flink default filesystem > --- > > Key: FLINK-10601 > URL: https://issues.apache.org/jira/browse/FLINK-10601 > Project: Flink > Issue Type: Improvement > Components: YARN >Affects Versions: 1.6.1 >Reporter: Paul Lin >Priority: Major > Labels: pull-request-available > > Currently, Flink client uses the raw default Hadoop filesystem to upload > local files, and this could be problematic when using a non-default > filesystem for HA or checkpointing in delegation tokens scenarios. The > jobmanager only has the delegation tokens for the default FS, so it gets > authentication errors when trying to connect a non-default filesystem. > So I propose to replace the default FS property in yarn configuration with > the Flink filesystem property `fs.default-scheme` on the client side > (AbstractYarnClusterDescriptor), to avoid this problem and also make the > client behavior more configurable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] link3280 opened a new pull request #6887: [FLINK-10601] [YARN] Make user home dir consistent with Flink default filesystem
link3280 opened a new pull request #6887: [FLINK-10601] [YARN] Make user home dir consistent with Flink default filesystem URL: https://github.com/apache/flink/pull/6887 ## What is the purpose of the change Currently, Flink client uses the raw default Hadoop filesystem to upload local files, and this could be problematic when using a non-default filesystem for HA or checkpointing in delegation tokens scenarios. The jobmanager only has the delegation tokens for the default FS, so it gets authentication errors when trying to connect a non-default filesystem. So I propose to make the default FS consistent with the Flink filesystem property fs.default-scheme (if it's set) on the client side (AbstractYarnClusterDescriptor), to avoid this problem and also make the client behavior more configurable. ## Brief change log - Replace the fs.defaultFS property in the yarn configuration with the initiated Flink filesystem's default scheme if it's not local. ## Verifying this change This change added tests and can be verified as follows: - Set the default fs in core.xml to a non-exist fs, and set the fs.default-scheme property to a valid one in flink-conf.yaml, then run the WordCount example to see if it works well. There would be errors while the client tries to upload the files to the remote system if things go wrong. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-10275) StreamTask support object reuse
[ https://issues.apache.org/jira/browse/FLINK-10275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-10275. --- Resolution: Won't Fix Fix Version/s: (was: 1.7.0) > StreamTask support object reuse > --- > > Key: FLINK-10275 > URL: https://issues.apache.org/jira/browse/FLINK-10275 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.7.0 >Reporter: TisonKun >Priority: Major > Labels: pull-request-available > > StreamTask support efficient object reuse. The purpose behind this is to > reduce pressure on the garbage collector. > All objects are reused, without backup copies. The operators and UDFs must be > careful to not keep any objects as state or not to modify the objects. > *FYI, now this thread is blocked by FLIP-21. The pull request attached is > closed for a needed rework.* -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10275) StreamTask support object reuse
[ https://issues.apache.org/jira/browse/FLINK-10275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-10275: --- > StreamTask support object reuse > --- > > Key: FLINK-10275 > URL: https://issues.apache.org/jira/browse/FLINK-10275 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.7.0 >Reporter: TisonKun >Priority: Major > Labels: pull-request-available > > StreamTask support efficient object reuse. The purpose behind this is to > reduce pressure on the garbage collector. > All objects are reused, without backup copies. The operators and UDFs must be > careful to not keep any objects as state or not to modify the objects. > *FYI, now this thread is blocked by FLIP-21. The pull request attached is > closed for a needed rework.* -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10605) Upgrade AvroSerializer snapshot to implement new TypeSerializerSnapshot interface
[ https://issues.apache.org/jira/browse/FLINK-10605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656874#comment-16656874 ] ASF GitHub Bot commented on FLINK-10605: igalshilman commented on a change in pull request #6881: [FLINK-10605] [core] Upgrade AvroSerializer snapshot to implement new TypeSerializerSnapshot interface URL: https://github.com/apache/flink/pull/6881#discussion_r226672698 ## File path: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializerSnapshot.java ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaCompatibility; +import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility; + +import java.io.IOException; +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An {@code Avro} specific implementation of a {@link TypeSerializerSnapshot}. + * + * @param The data type that the originating serializer of this configuration serializes. + */ +public final class AvroSerializerSnapshot implements TypeSerializerSnapshot { + private Class runtimeType; + private Schema schema; + + @SuppressWarnings("WeakerAccess") + public AvroSerializerSnapshot() { + // this constructor is used when restoring from a checkpoint. + } + + AvroSerializerSnapshot(Schema schema, Class runtimeType) { + this.schema = schema; + this.runtimeType = runtimeType; + } + + @Override + public int getCurrentVersion() { + return 1; + } + + @Override + public void write(DataOutputView out) throws IOException { + checkNotNull(runtimeType); + checkNotNull(schema); + + out.writeUTF(runtimeType.getName()); + out.writeUTF(schema.toString(false)); + } + + @Override + public void read(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { + final String previousRuntimeClassName = in.readUTF(); + final String previousSchemaDefinition = in.readUTF(); + + this.runtimeType = findClassOrThrow(userCodeClassLoader, previousRuntimeClassName); + this.schema = parseAvroSchema(previousSchemaDefinition); + } + + @Override + public > TypeSerializerSchemaCompatibility + resolveSchemaCompatibility(NS newSerializer) { + if (!(newSerializer instanceof AvroSerializer)) { + return TypeSerializerSchemaCompatibility.incompatible(); + } + AvroSerializer newAvroSerializer = (AvroSerializer) newSerializer; + return resolveSchemaCompatibility(schema, newAvroSerializer.getAvroSchema()); + } + + @Override + public TypeSerializer restoreSerializer() { + checkNotNull(runtimeType); + checkNotNull(schema); + + if (AvroSerializer.isGenericRecord(runtimeType)) { + return new AvroSerializer<>(runtimeType, schema); + } + else { + return new AvroSerializer<>(runtimeType); + } + } + + // + // Helpers + // + + /** +* Resolves writer/reader schema compatibly. +* +* Checks whenever a new version of