[jira] [Created] (FLINK-11094) Restored state in RocksDBStateBackend that has not been accessed in new execution causes NPE on snapshot
Tzu-Li (Gordon) Tai created FLINK-11094: --- Summary: Restored state in RocksDBStateBackend that has not been accessed in new execution causes NPE on snapshot Key: FLINK-11094 URL: https://issues.apache.org/jira/browse/FLINK-11094 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.7.0 Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Fix For: 1.7.1 This was caused by changes in FLINK-10679. The problem is that in that change, in the {{RocksDBKeyedBackend}}, {{RegisteredStateMetaInfoBase}}s were no longer created eagerly for all restored state, but instead only lazily created when the state was accessed again by the user. This causes non-accessed restored state to have empty meta info, and throws NPE when trying to take a snapshot of them. The rationale behind FLINK-10679 was that, since {{RegisteredStateMetaInfoBase}} holds already serializer instances for state access, creating them eagerly at restore time with restored serializer snapshots did not make sense (because at that point-in-time, we do not have the new serializers yet for state access; the snapshot is only capable of creating the previous state serializer). I propose the following: Instead of having final {{TypeSerializer}} instances in {{RegisteredStateMetaInfoBase}}s, they should have a {{StateSerializerProvider}} instead. The {{StateSerializerProvider}} would have the following methods: {code} public class StateSerializerProvider { TypeSerializer getCurrentSerializer(); TypeSerializer updateCurrentSerializer(TypeSerializer newSerializer); TypeSerializer getPreviousSerializer(); } {code} A {{StateSerializerProvider}} can be created either from: 1) A restored serializer snapshot when restoring the state. 2) A fresh, new state's serializer, when registering the state for the first time. For 1), state that has not been accessed after the restore will return the same serializer (i.e. the previous serializer) for both {{getPreviousSerializer}} and {{getCurrentSerializer}}. Once a restored state is re-accessed, then {{updateCurrentSerializer(TypeSerializer newSerializer)}} should be used to update what serializer the provider returns in {{getCurrentSerializer}}. We could also make use of this new abstraction to move away some of the new serializer's compatibility checks from the state backend to {{StateSerializerProvider#updateCurrentSerializer}}. For tests, apparently we're lacking test coverage for restored state that has not been accessed and being snapshotted again. This should be included as part of the fix. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11093) Migrate flink-table runtime Function classes
xueyu created FLINK-11093: - Summary: Migrate flink-table runtime Function classes Key: FLINK-11093 URL: https://issues.apache.org/jira/browse/FLINK-11093 Project: Flink Issue Type: New Feature Reporter: xueyu Assignee: xueyu As discussed in [FLINK-11065|https://issues.apache.org/jira/browse/FLINK-11065], this is a subtask which migrates flink-table {{org.apache.flink.table.runtime.\*Function.scala}} to java in module flink-table-runtime -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11092) Migrate flink-table runtime Selector and Collector classes
xueyu created FLINK-11092: - Summary: Migrate flink-table runtime Selector and Collector classes Key: FLINK-11092 URL: https://issues.apache.org/jira/browse/FLINK-11092 Project: Flink Issue Type: New Feature Components: Table API SQL Reporter: xueyu Assignee: xueyu As discussed in [FLINK-11065|https://issues.apache.org/jira/browse/FLINK-11065], this is a subtask which migrates flink-table CRowKeySelector, RowKeySelector, CRowWrappingCollector, and TableFunctionCollector to java in module flink-table-runtime -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Discuss [FLINK-9740] Support group windows over intervals of months
hi Timo Walther?? I have redesigned and sorted it out. Please find the details in the attachment and I also added it to jira. https://issues.apache.org/jira/browse/FLINK-9740 thanks qianjin
[jira] [Created] (FLINK-11090) Unused parameter in WindowedStream.aggregate()
Hequn Cheng created FLINK-11090: --- Summary: Unused parameter in WindowedStream.aggregate() Key: FLINK-11090 URL: https://issues.apache.org/jira/browse/FLINK-11090 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Hequn Cheng Assignee: Hequn Cheng The {{aggregateResultType}} parameter in {{WindowedStream.aggregate()}} seems useless. Or what have I missed? If it is useless, I prefer to remove the parameter by adding a new API and deprecate the current one. We can't remove it directly as it is PublicEvolving. {code:java} @PublicEvolving public SingleOutputStreamOperator aggregate( AggregateFunction aggregateFunction, ProcessWindowFunction windowFunction, TypeInformation accumulatorType, TypeInformation aggregateResultType, TypeInformation resultType) { } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11091) Clear the use of deprecated methods of KeyedStream in table operators
sunjincheng created FLINK-11091: --- Summary: Clear the use of deprecated methods of KeyedStream in table operators Key: FLINK-11091 URL: https://issues.apache.org/jira/browse/FLINK-11091 Project: Flink Issue Type: Improvement Components: Table API SQL Affects Versions: 1.7.0, 1.6.2, 1.5.5 Reporter: sunjincheng The method of `KeyedStream#process(ProcessFunction)` has bend deprecated after FLINK-8560. So It's good to using `KeyedStream#process(KeyProcessFunction)` to implement `DataStreamSort`, `DataStreamGroupAggregate` and `DataStreamOverAggregate` operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: Apply for flink contributor permission
Hi, You now have contributor permissions and can assign JIRAs you would like to work on to yourself. Welcome to the community! Cheers, Gordon On Fri, Dec 7, 2018, 11:17 AM sf lee Hi there, > Could anyone kindly give me the contributor permission? > My JIRA id is xleesf. > > Thanks, > > xleesf >
Apply for flink contributor permission
Hi there, Could anyone kindly give me the contributor permission? My JIRA id is xleesf. Thanks, xleesf
[jira] [Created] (FLINK-11089) Log filecache directory removed messages
liuzhaokun created FLINK-11089: -- Summary: Log filecache directory removed messages Key: FLINK-11089 URL: https://issues.apache.org/jira/browse/FLINK-11089 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 1.7.0 Reporter: liuzhaokun When taskmanager exit or shutdown,the filecache directory named "flink-dist-cache*" will be removed,but there is not any log about this action.So I think we should log it for user to check it easy when there are some bugs. You can see IOManager.java logs the removed messages when taskmanager shutdown, filecache can do the same things. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
dev apply
dev apply
Re: delay one of the datastream when performing join operation on event-time and watermark
Hi Pakesh Kuma, I think you can using the interval-join, e.g.: orderStream .keyBy() .intervalJoin(invoiceStream.keyBy()) .between(Time.minutes(-5), Time.minutes(5)) The semantics of interval-join and detailed usage description can refer to https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/joining.html#interval-join Hope to help you, and any feedback is welcome! Bests, Jincheng Rakesh Kumar 于2018年12月6日周四 下午7:10写道: > Hi, > I have two data sources one is for order data and another one is for > invoice data, these two data i am pushing into kafka topic in json form. I > wanted to delay order data for 5 mins because invoice data comes only after > order data is generated. So, for that i have written a flink program which > will take these two data from kafka and apply watermarks and delay order > data for 5 mins. After applying watermarks on these data, i wanted to join > these data based on order_id which is present in both order and invoice > data. After Joining i wanted to push it to kafka in different topic. > > But, i am not able to join these data streams with 5 min delay and i am > not able to figure it out. > > I am attaching my flink program below and it's dependency. >
[jira] [Created] (FLINK-11088) Improve Kerberos Authentication using Keytab in YARN proxy user mode
Rong Rong created FLINK-11088: - Summary: Improve Kerberos Authentication using Keytab in YARN proxy user mode Key: FLINK-11088 URL: https://issues.apache.org/jira/browse/FLINK-11088 Project: Flink Issue Type: Improvement Components: YARN Reporter: Rong Rong Currently flink-yarn assumes keytab is shipped as application master environment local resource on client side and will be distributed to all the TMs. This does not work for YARN proxy user mode since proxy user or super user does not have access to actual user's keytab but only delegation tokens. We propose to have the keytab file path discovery configurable depending on the launch mode of the YARN client. Reference: https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: Apply for permission to solve flink's jira issues
Hi, welcome to the Flink community. If you give me your JIRA username, I can give your contributor permissions. Thanks, Timo Am 06.12.18 um 12:12 schrieb shen lei: Hi All, Could you give me the permission to solve the flink's jira issues? I am interested in Flink, and I want to find some easy jira issues to study flink.If possible,I hope to make some contribution to flink.At the same time , I could learn flink more deeply.Thank you. Best wishes, Lei Shen
[jira] [Created] (FLINK-11086) flink-hadoop-compatibility tests fail for 3.x hadoop versions
Sebastian Klemke created FLINK-11086: Summary: flink-hadoop-compatibility tests fail for 3.x hadoop versions Key: FLINK-11086 URL: https://issues.apache.org/jira/browse/FLINK-11086 Project: Flink Issue Type: Bug Components: YARN Reporter: Sebastian Klemke All builds using maven 3.2.5 on commithash ed8ff14ed39d08cd319efe75b40b9742a2ae7558. Attempted builds: - mvn clean install -Dhadoop.version=3.0.3 - mvn clean install -Dhadoop.version=3.1.1 Integration tests with Hadoop input format datasource fail. Example stack trace, taken from hadoop.version 3.1.1 build: {code:java} testJobCollectionExecution(org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase) Time elapsed: 0.275 sec <<< ERR OR! java.lang.NoClassDefFoundError: org/apache/flink/hadoop/shaded/com/google/re2j/PatternSyntaxException at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.hadoop.fs.Globber.doGlob(Globber.java:210) at org.apache.hadoop.fs.Globber.glob(Globber.java:149) at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:2085) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:269) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:239) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:325) at org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:150) at org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:58) at org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:225) at org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:219) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:155) at org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149) at org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149) at org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149) at org.apache.flink.api.common.operators.CollectionExecutor.executeUnaryOperator(CollectionExecutor.java:229) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:149) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:131) at org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:182) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:158) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:131) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:115) at org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:38) at org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:52) at org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase.internalRun(WordCountMapredITCase.java:121) at org.apache.flink.test.hadoopcompatibility.mapred.WordCountMapredITCase.testProgram(WordCountMapredITCase.java:71) {code} Maybe hadoop 3.x versions could be added to test matrix as well? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11087) Broadcast state migration Incompatibility from 1.5.3 to 1.7.0
Edward Rojas created FLINK-11087: Summary: Broadcast state migration Incompatibility from 1.5.3 to 1.7.0 Key: FLINK-11087 URL: https://issues.apache.org/jira/browse/FLINK-11087 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.7.0 Environment: Migration from Flink 1.5.3 to Flink 1.7.0 Reporter: Edward Rojas When upgrading from Flink 1.5.3 to Flink 1.7.0, the migration of broadcast state throws the following error: {noformat} org.apache.flink.util.StateMigrationException: The new key serializer for broadcast state must not be incompatible. at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:238) at org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.open(CoBroadcastWithNonKeyedOperator.java:87) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:745){noformat} The broadcast is using a MapState with StringSerializer as key serializer and a custom JsonSerializer as value serializer. There was no changes in the TypeSerializers used, only upgrade of version. With some debugging I see that at the moment of the validation of the compatibility of states in the DefaultOperatorStateBackend class, the "*registeredBroadcastStates*" containing the data about the 'old' state, contains wrong association of the key and value serializer. This is, JsonSerializer appears as key serializer and StringSerializer appears as value serializer. (when it should be the contrary) After more digging, I see that the "OperatorBackendStateMetaInfoReaderV2V3" class is the responsible of this swap here: https://github.com/apache/flink/blob/release-1.7/flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/LegacyStateMetaInfoReaders.java#L165 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11085) flink-s3-fs-presto
Chesnay Schepler created FLINK-11085: Summary: flink-s3-fs-presto Key: FLINK-11085 URL: https://issues.apache.org/jira/browse/FLINK-11085 Project: Flink Issue Type: Bug Components: FileSystem Affects Versions: 1.7.0 Reporter: Chesnay Schepler A user has reporter an issue on the ML where using the presto-s3 filesystem fails with an exception due to a missing class. The missing class is indeed filtered out in the shade-plugin configuration. {code:java} java.lang.NoClassDefFoundError: org/apache/flink/fs/s3presto/shaded/com/facebook/presto/hadoop/HadoopFileStatus at org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:446) at org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hive.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:423) at org.apache.flink.fs.s3.common.hadoop.HadoopFileSystem.delete(HadoopFileSystem.java:147) at org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:80) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.doDiscard(CompletedCheckpoint.java:250) at org.apache.flink.runtime.checkpoint.CompletedCheckpoint.discardOnSubsume(CompletedCheckpoint.java:219) at org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore.addCheckpoint(StandaloneCompletedCheckpointStore.java:72) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:844) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:756) at org.apache.flink.runtime.jobmaster.JobMaster.lambda$acknowledgeCheckpoint$8(JobMaster.java:680) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Apply for permission to solve flink's jira issues
Hi All, Could you give me the permission to solve the flink's jira issues? I am interested in Flink, and I want to find some easy jira issues to study flink.If possible,I hope to make some contribution to flink.At the same time , I could learn flink more deeply.Thank you. Best wishes, Lei Shen
delay one of the datastream when performing join operation on event-time and watermark
Hi, I have two data sources one is for order data and another one is for invoice data, these two data i am pushing into kafka topic in json form. I wanted to delay order data for 5 mins because invoice data comes only after order data is generated. So, for that i have written a flink program which will take these two data from kafka and apply watermarks and delay order data for 5 mins. After applying watermarks on these data, i wanted to join these data based on order_id which is present in both order and invoice data. After Joining i wanted to push it to kafka in different topic. But, i am not able to join these data streams with 5 min delay and i am not able to figure it out. I am attaching my flink program below and it's dependency. http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> 4.0.0 com.flink.streaming flinkJoin 0.0.1-SNAPSHOT jar flinkJoin http://maven.apache.org UTF-8 1.8 1.8 org.slf4j slf4j-log4j12 1.7.5 org.apache.flink flink-connector-kafka_2.11 1.7.0 org.slf4j slf4j-log4j12 log4j log4j org.apache.flink flink-streaming-java_2.11 1.7.0 org.slf4j slf4j-log4j12 log4j log4j org.apache.flink flink-streaming-scala_2.11 1.7.0 org.slf4j slf4j-log4j12 log4j log4j org.json json 20180813 org.apache.flink flink-clients_2.11 1.7.0 org.apache.flink flink-runtime_2.11 1.7.0 org.apache.maven.plugins maven-assembly-plugin 2.3 src/main/assembly/default.xml assembly package single
Re: [DISCUSS] Flink SQL DDL Design
Hi everyone, great to have such a lively discussion. My next batch of feedback: @Jark: We don't need to align the descriptor approach with SQL. I'm open for different approaches as long as we can serve a broad set of use cases and systems. The descriptor approach was a first attempt to cover all aspects and connector/format characteristics. Just another example, that is missing in the DDL design: How can a user decide if append, retraction, or upserts should be used to sink data into the target system? Do we want to define all these improtant properties in the big WITH property map? If yes, we are already close to the descriptor approach. Regarding the "standard way", most DDL languages have very custom syntax so there is not a real "standard". 3. Sources/Sinks: @Lin: If a table has both read/write access it can be created using a regular CREATE TABLE (omitting a specific source/sink) declaration. Regarding the transition from source/sink to both, yes we would need to update the a DDL and catalogs. But is this a problem? One also needs to add new queries that use the tables. @Xuefu: It is not only about security aspects. Especially for streaming use cases, not every connector can be used as a source easily. For example, a JDBC sink is easier than a JDBC source. Let's assume an interactive CLI session, people should be able to list all source table and sink tables to know upfront if they can use an INSERT INTO here or not. 6. Partitioning and keys: @Lin: I would like to include this in the design given that Hive integration and Kafka key support are in the making/are on our roadmap for this release. 5. Schema declaration: @Lin: You are right it is not conflicting. I just wanted to raise the point because if users want to declare computed columns they have a "schema" constraints but without columns. Are we ok with a syntax like ... CREATE TABLE (PRIMARY_KEY(a, c)) WITH (format.type = avro, format.schema-file = "/my/avrofile.avsc") ? @Xuefu: Yes, you are right that an external schema might not excatly match but this is true for both directions: table schema "derives" format schema and format schema "derives" table schema. 7. Hive compatibility: @Xuefu: I agree that Hive is popular but we should not just adopt everything from Hive as there syntax is very batch-specific. We should come up with a superset of historical and future requirements. Supporting Hive queries can be an intermediate layer on top of Flink's DDL. 4. Time attributes: @Lin: I'm fine with changing the TimestampExtractor interface as this is also important for better separation of connector and table module [1]. However, I'm wondering about watermark generation. 4a. timestamps are in the schema twice: @Jark: "existing field is Long/Timestamp, we can just use it as rowtime": yes, but we need to mark a field as such an attribute. How does the syntax for marking look like? Also in case of timestamps that are nested in the schema? 4b. how can we write out a timestamp into the message header?: I agree to simply ignore computed columns when writing out. This is like 'field-change: add' that I mentioned in the improvements document. @Jark: "then the timestmap in StreamRecord will be write to Kafka message header": Unfortunately, there is no timestamp in the stream record. Additionally, multiple time attributes can be in a schema. So we need a constraint that tells the sink which column to use (possibly computed as well)? 4c. separate all time attribute concerns into a special clause next to the regular schema? @Jark: I don't have a strong opinion on this. I just have the feeling that the "schema part" becomes quite messy because the actual schema with types and fields is accompanied by so much metadata about timestamps, watermarks, keys,... and we would need to introduce a new WATERMARK keyword within a schema that was close to standard up to this point. Thanks everyone, Timo [1] https://issues.apache.org/jira/browse/FLINK-9461 Am 06.12.18 um 07:08 schrieb Jark Wu: Hi Timo, Thank you for the valuable feedbacks. First of all, I think we don't need to align the SQL functionality to Descriptor. Because SQL is a more standard API, we should be as cautious as possible to extend the SQL syntax. If something can be done in a standard way, we shouldn't introduce something new. Here are some of my thoughts: 1. Scope: Agree. 2. Constraints: Agree. 4. Time attributes: 4a. timestamps are in the schema twice. If an existing field is Long/Timestamp, we can just use it as rowtime, no twice defined. If it is not a Long/Timestamp, we use computed column to get an expected timestamp column to be rowtime, is this what you mean defined twice? But I don't think it is a problem, but an advantages, because it is easy to use, user do not need to consider whether to "replace the existing column" or "add a new column", he will not be confused what's the real schema is, what's the index of