Re: [VOTE] Release flink-shaded 7.0, release candidate 2
Thank you Jincheng for the release! +1 (non-binding) - Release notes are correct. - Built from source archive successfully. - Signatures and hash are correct. - All artifacts(11 artifacts including flink-shaded) have been deployed to the maven central repository. One minor comment for the website pull request but I think it is not a blocker. Best, Hequn On Mon, May 20, 2019 at 9:20 AM jincheng sun wrote: > Hi everyone, > > Please review and vote on the release candidate #2 for the version 7.0, as > follows: > > [ ] +1, Approve the release > [ ] -1, Do not approve the release (please provide specific comments) > > The complete staging area is available for your review, which includes: > * JIRA release notes [1], > * the official Apache source release to be deployed to dist.apache.org > [2], > which are signed with the key with fingerprint > 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [3], > * all artifacts to be deployed to the Maven Central Repository [4], > * source code tag "release-7.0-rc2" [5], > * website pull request listing the new release [6]. > > The vote will be open for at least 72 hours. It is adopted by majority > approval, with at least 3 PMC affirmative votes. > > Thanks, > Jincheng > > [1] > > https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12345226=Html=12315522=Create_token=A5KQ-2QAV-T4JA-FDED%7C8ba061049bec0c5a72dc0191c47bb53a73b82cb4%7Clin > [2] https://dist.apache.org/repos/dist/dev/flink/flink-shaded-7.0-rc2/ > [3] https://dist.apache.org/repos/dist/release/flink/KEYS > [4] https://repository.apache.org/content/repositories/orgapacheflink-1218 > [5] https://github.com/apache/flink-shaded/tree/release-7.0-rc2 > [6] https://github.com/apache/flink-web/pull/209 >
[jira] [Created] (FLINK-12586) Stderr and stdout are reversed in OptimizerPlanEnvironment
Kazunori Shinhira created FLINK-12586: - Summary: Stderr and stdout are reversed in OptimizerPlanEnvironment Key: FLINK-12586 URL: https://issues.apache.org/jira/browse/FLINK-12586 Project: Flink Issue Type: Bug Components: Command Line Client Affects Versions: 1.8.0, 1.7.2 Reporter: Kazunori Shinhira In OptimizerPlanEnvironment#getOptimizedPlan method, it looks like that stdout is output as System.err and stderr is output as System.out. [https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L107-L108] I think, It should be like as bellow. {code:java} throw new ProgramInvocationException( "The program plan could not be fetched - the program aborted pre-maturely." + "\n\nSystem.err: " + (stdout.length() == 0 ? "(none)" : stderr) + "\n\nSystem.out: " + (stderr.length() == 0 ? "(none)" : stdout)); {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12585) Align Stream/BatchTableEnvironment with JAVA Table API
sunjincheng created FLINK-12585: --- Summary: Align Stream/BatchTableEnvironment with JAVA Table API Key: FLINK-12585 URL: https://issues.apache.org/jira/browse/FLINK-12585 Project: Flink Issue Type: Sub-task Components: API / Python Affects Versions: 1.9.0 Reporter: sunjincheng Initially we wanted to align with the [FLIP-32|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions]] plan and Unify the TableEnvironment. such as: {code:java} TableConfig config = TableConfig.builder() .asStreamingExecution() // example of providing configuration that was in StreamExecutionEnvironment before .watermarkInterval(100) .build(); TableEnvironment tEnv = TableEnvironment.create(config);{code} So, Current Python Table API as follows: {code:java} self.t_config = TableConfig.Builder().as_streaming_execution().set_parallelism(1).build() self.t_env = TableEnvironment.create(self.t_config){code} But, due to Java API not have done this improve yet, and the end date of release 1.9 is coming, So, It's better to align the `Stream/BatchTableEnvironment` with JAVA Table API for now. we should follow the current Java style, for example: {code:java} val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(getStateBackend) val tEnv = StreamTableEnvironment.create(env){code} What to do you think? [~dian.fu] [~WeiZhong] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12584) Add Bucket File Syetem Table Sink
zhangjun created FLINK-12584: Summary: Add Bucket File Syetem Table Sink Key: FLINK-12584 URL: https://issues.apache.org/jira/browse/FLINK-12584 Project: Flink Issue Type: New Feature Components: Table SQL / API Affects Versions: 1.8.0, 1.9.0 Reporter: zhangjun Assignee: zhangjun # ** *Motivation* In flink, the file system (especially hdfs) is a very common output, but for users using sql, it does not support directly using sql to write data to the file system, so I want to add a bucket file system table sink, the user can register it to StreamTableEnvironment, so that table api and sql api can directly use the sink to write stream data to filesystem # ** *example* tEnv.connect(new Bucket().basePath("hdfs://localhost/tmp/flink-data")) .withFormat(new Json().deriveSchema()) .withSchema(new Schema() .field("name", Types. STRING ()) .field("age", Types. INT ()) .inAppendMode() .registerTableSink("myhdfssink"); tEnv.sqlUpdate("insert into myhdfssink SELECT * FROM mytablesource"); # ** *Some ideas to achieve this function* 1) Add a class Bucket which extends from ConnectorDescriptor, add some properties, such as basePath. 2) Add a class BucketValidator which extends from the ConnectorDescriptorValidator and is used to check the bucket descriptor. 3) Add a class FileSystemTableSink to implement the StreamTableSink interface. In the emitDataStream method, construct StreamingFileSink for writing data to filesystem according to different properties. 4) Add a factory class FileSystemTableSinkFactory to implement the StreamTableSinkFactory interface for constructing FileSystemTableSink 5) The parameters of withFormat method is the implementation classes of the FormatDescriptor interface, such as Json, Csv, and we can add Parquet、Orc later. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
contribute to Apache Flink
Hi, I want to contribute to Apache Flink. Would you please give me the contributor permission? My Jira ID is habren Thanks & Best Regards, Jason
Checkpoint / Two Phase Commit
Question regarding end-to-end exactly once guarantee implementation using 2PC? As I understand how it operates, the pre-phase state is when the checkpoint is initiated and the checkpoint barrier advances from source to sink. Once the pre-phase is complete (and successful), then the next step in the process is where the "Sink" operator is expected to "Commit" the transaction (the data that was part of the checkpointed state). The datasource backed by the Sink operator is expected to commit the transaction. Say if the transaction times out and data cannot be committed, is there an option to roll back the checkpointed state without incurring the data loss? As of now, it does not work this way but I am trying to understand the challenges/limitations with respect to discarding the checkpoint in question? Once reason could be with respect to (what to do with) the subsequent checkpoints that might have advanced during this scenario? Anything else? Regards Vijay -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
[jira] [Created] (FLINK-12583) Add all format support align with the Java Table API
sunjincheng created FLINK-12583: --- Summary: Add all format support align with the Java Table API Key: FLINK-12583 URL: https://issues.apache.org/jira/browse/FLINK-12583 Project: Flink Issue Type: Sub-task Components: API / Python Affects Versions: 1.9.0 Reporter: sunjincheng We have added the `CSV` format and base test case for the format in FLINK-12439, we also need to add another format, such as JSON, PARQUET, AVRO, etc. in Python Table API for aligning with JAVA Table API. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12582) Alterations in GenericInMemoryCatalog should check existing object and new object are of the same class
Bowen Li created FLINK-12582: Summary: Alterations in GenericInMemoryCatalog should check existing object and new object are of the same class Key: FLINK-12582 URL: https://issues.apache.org/jira/browse/FLINK-12582 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Alterations in GenericInMemoryCatalog should check existing object and new object are of the same class. It currently doesn't, and you can alter an existing generic table with a new hive table. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12581) HiveCatalog.alterTable() and alterDatabase() should check the existing object and new object are of same type
Bowen Li created FLINK-12581: Summary: HiveCatalog.alterTable() and alterDatabase() should check the existing object and new object are of same type Key: FLINK-12581 URL: https://issues.apache.org/jira/browse/FLINK-12581 Project: Flink Issue Type: Sub-task Components: Connectors / Hive Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 HiveCatalog.alterTable() and alterDatabase() should check the existing object and new object are of same type. It currently doesn't, and you can alter an existing generic table with a new hive table. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12580) Rename GenericHiveMetastoreCatalogTest to HiveCatalogFlinkMetadataTest, and HiveCatalogTest to HiveCatalogHiveMetadataTest
Bowen Li created FLINK-12580: Summary: Rename GenericHiveMetastoreCatalogTest to HiveCatalogFlinkMetadataTest, and HiveCatalogTest to HiveCatalogHiveMetadataTest Key: FLINK-12580 URL: https://issues.apache.org/jira/browse/FLINK-12580 Project: Flink Issue Type: Sub-task Components: Connectors / Hive Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.9.0 Rename GenericHiveMetastoreCatalogTest to HiveCatalogFlinkMetadataTest, and HiveCatalogTest to HiveCatalogHiveMetadataTest, since we unified GenericHiveMetastoreCatalog and HiveCatalog into a new HiveCatalog -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12579) Influx Metric exporter throws a lot of negative infinity errors
Theo Diefenthal created FLINK-12579: --- Summary: Influx Metric exporter throws a lot of negative infinity errors Key: FLINK-12579 URL: https://issues.apache.org/jira/browse/FLINK-12579 Project: Flink Issue Type: Bug Reporter: Theo Diefenthal When using the InfluxDB metrics, the logs are polluted by lots of warnings due to negative infinity values which influxDB can't handle:{{}} 2019-05-21 18:24:40,410 WARN org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while reporting metrics org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException: partial write: unable to parse 'taskmanager_job_task_operator_sync-time-max,host=..,job_id=d7306cf5af0cf386a9259845d2a79f7c,job_name=..,operator_id=0be27db0efe2375e2766b48026cbee26,operator_name=Source:\ ..._kafka_source_...,subtask_index=0,task_attempt_id=bbff973b4e71d377745f0f2e3bf884ef,task_attempt_num=0,task_id=0be27db0efe2375e2766b48026cbee26,task_name=Source:\ ..._kafka_source_PROXYLOGS\ ->\ ..\ ->\ ...,tm_id=container_e101_1557348638026_49850_01_02 value=-∞ 155846308013300': invalid number unable to parse 'taskmanager_job_task_operator_KafkaConsumer_join-time-max,host=...,job_id=d7306cf5af0cf386a9259845d2a79f7c,job_name=...,operator_id=0be27db0efe2375e2766b48026cbee26,operator_name=Source:\ ..._kafka_source_...,subtask_index=0,task_attempt_id=bbff973b4e71d377745f0f2e3bf884ef,task_attempt_num=0,task_id=0be27db0efe2375e2766b48026cbee26,task_name=Source:\ ..._kafka_source_...,tm_id=container_e101_1557348638026_49850_01_02 value=-∞ 155846308013300': invalid number unable to parse 'taskmanager_job_task_operator_KafkaConsumer_heartbeat-response-time-max,host=...,job_id=d7306cf5af0cf386a9259845d2a79f7c,job_name=...,operator_id=0be27db0efe2375e2766b48026cbee26,operator_name=Source:\ .._kafka_source_..,subtask_index=0,task_attempt_id=bbff973b4e71d377745f0f2e3bf884ef,task_attempt_num=0,task_id=0be27db0efe2375e2766b48026cbee26,task_name=Source:\ .._kafka_source_...,tm_id=container_e101_1557348638026_49850_01_02 value=-∞ 155846308013300': invalid number dropped=0 at org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException.buildExceptionFromErrorMessage(InfluxDBException.java:147) at org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException.buildExceptionForErrorState(InfluxDBException.java:173) at org.apache.flink.metrics.influxdb.shaded.org.influxdb.impl.InfluxDBImpl.execute(InfluxDBImpl.java:796) at org.apache.flink.metrics.influxdb.shaded.org.influxdb.impl.InfluxDBImpl.write(InfluxDBImpl.java:455) at org.apache.flink.metrics.influxdb.InfluxdbReporter.report(InfluxdbReporter.java:97) at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:430) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12578) Use secure URLs for Maven repositories
Jungtaek Lim created FLINK-12578: Summary: Use secure URLs for Maven repositories Key: FLINK-12578 URL: https://issues.apache.org/jira/browse/FLINK-12578 Project: Flink Issue Type: Improvement Components: Build System Reporter: Jungtaek Lim Assignee: Jungtaek Lim Currently, some of repository URLs in Maven pom.xml are http scheme. Ideally they should have been https scheme. Below is the list of repositories which use http scheme in pom files for now: * Confluent * HWX * MapR -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12577) Flink uses Kryo serializer if given a SpecificRecord type
Nico Kruber created FLINK-12577: --- Summary: Flink uses Kryo serializer if given a SpecificRecord type Key: FLINK-12577 URL: https://issues.apache.org/jira/browse/FLINK-12577 Project: Flink Issue Type: Bug Components: API / Type Serialization System Affects Versions: 1.8.0, 1.7.2, 1.9.0 Reporter: Nico Kruber If a data stream does not return a *subclass* of {{SpecificRecord}}, Flink will actually infer a {{GenericTypeInfo}} and use kryo instead. Example: {code} StreamExecutionEnvironment env = ... env.getConfig().disableGenericTypes(); DataStream sourceStream = env.addSource(...); DataStream test = sourceStream.map(value -> new AggregatedSensorStatistics()); public class AggregatedSensorStatistics extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {...} {code} This will lead to {code} Exception in thread "main" java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type org.apache.avro.specific.SpecificRecord is treated as a generic type. at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86) at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:208) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:541) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:166) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:132) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:124) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1537) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:89) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510) at com.ververica.training.statemigration.StateMigrationJobBase.createAndExecuteJob(StateMigrationJobBase.java:68) at com.ververica.training.statemigration.avro.StateMigrationJob.main(StateMigrationJob.java:13) {code} You may want some flexibility in your types and thus not provide the exact one like {{AggregatedSensorStatistics}} in this example. I don't see any reason we should disallow that behaviour. Reason for this is that {{TypeExtractor#privateGetForClass()}} is having this code which only sees classes as Avro if they *extend* from {{SpecificRecord}}: {code} if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) { return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz); } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
contribute to Apache Flink
Hi, I want to contribute to Apache Flink. Would you please give me the contributor permission? My JIRA ID is lujisen. 祝工作顺利,身体健康! * 陆继森 手 机:18560768631 E-Mail:luj...@126.com *
Re: [DISCUSS] FLIP-39: Flink ML pipeline and ML libs
Yes, this is our conclusion. I'd like to add only one point that registering user defined aggregator is also needed which is currently provided by 'bridge' and finally will be merged into Table API. It's same with collect(). I will add a TableEnvironment argument in Estimator.fit() and Transformer.transform() to get rid of the dependency on flink-table-planner. This will be committed soon. Aljoscha Krettek 于2019年5月21日周二 下午7:31写道: > We discussed this in private and came to the conclusion that we should > (for now) have the dependency on flink-table-api-xxx-bridge because we need > access to the collect() method, which is not yet available in the Table > API. Once that is available the code can be refactored but for now we want > to unblock work on this new module. > > We also agreed that we don’t need a direct dependency on > flink-table-planner. > > I hope I summarised our discussion correctly. > > > On 17. May 2019, at 12:20, Gen Luo wrote: > > > > Thanks for your reply. > > > > For the first question, it's not strictly necessary. But I perfer not to > > have a TableEnvironment argument in Estimator.fit() or > > Transformer.transform(), which is not part of machine learning concept, > and > > may make our API not as clean and pretty as other systems do. I would > like > > another way other than introducing flink-table-planner to do this. If > it's > > impossible or severely opposed, I may make the concession to add the > > argument. > > > > Other than that, "flink-table-api-xxx-bridge"s are still needed. A vary > > common case is that an algorithm needs to guarantee that it's running > under > > a BatchTableEnvironment, which makes it possible to collect result each > > iteration. A typical algorithm like this is ALS. By flink1.8, this can be > > only achieved by converting Table to DataSet than call DataSet.collect(), > > which is available in flink-table-api-xxx-bridge. Besides, registering > > UDAGG is also depending on it. > > > > In conclusion, '"planner" can be removed from dependencies but > introducing > > "bridge"s are inevitable. Whether and how to acquire TableEnvironment > from > > a Table can be discussed. > >
[jira] [Created] (FLINK-12576) inputQueueLength metric does not work for LocalInputChannels
Piotr Nowojski created FLINK-12576: -- Summary: inputQueueLength metric does not work for LocalInputChannels Key: FLINK-12576 URL: https://issues.apache.org/jira/browse/FLINK-12576 Project: Flink Issue Type: Bug Components: Runtime / Metrics Affects Versions: 1.8.0, 1.7.2, 1.6.4 Reporter: Piotr Nowojski Currently {{inputQueueLength}} ignores LocalInputChannels ({{SingleInputGate#getNumberOfQueuedBuffers}}). This can can cause mistakes when looking for causes of back pressure (If task is back pressuring whole Flink job, but there is a data skew and only local input channels are being used). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12575) Introduce planner rules to remove redundant shuffle and collation
godfrey he created FLINK-12575: -- Summary: Introduce planner rules to remove redundant shuffle and collation Key: FLINK-12575 URL: https://issues.apache.org/jira/browse/FLINK-12575 Project: Flink Issue Type: New Feature Components: Table SQL / Planner Reporter: godfrey he Assignee: godfrey he {{Exchange}} and {{Sort}} is the most heavy operator, they are created in {{FlinkExpandConversionRule}} when some operators require its inputs to satisfy distribution trait or collation trait in planner rules. However, many operators could provide distribution or collation, e.g. {{BatchExecHashAggregate}} or {{BatchExecHashJoin}} could provide distribution on its shuffle keys, {{BatchExecSortMergeJoin}} could provide distribution and collation on its join keys. If the provided traits could satisfy the required traits, the {{Exchange}} or the {{Sort}} is redundant. e.g. {code:sql} schema: x: a int, b bigint, c varchar y: d int, e bigint, f varchar t1: a1 int, b1 bigint, c1 varchar t2: d1 int, e1 bigint, f1 varchar sql: select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left outer join t2 on a1 = d1 and b1 = e1 the physical plan after redundant Exchange and Sort are removed: SortMergeJoin(joinType=[LeftOuterJoin], where=[AND(=(a1, d1), =(b1, e1))], leftSorted=[true], ...) :- SortMergeJoin(joinType=[InnerJoin], where=[AND(=(d, a1), =(e, b1))], leftSorted=[true], ...) : :- SortMergeJoin(joinType=[InnerJoin], where=[AND(=(a, d), =(b, e))], ...) : : :- Exchange(distribution=[hash[a, b]]) : : : +- TableSourceScan(table=[[x]], ...) : : +- Exchange(distribution=[hash[d, e]]) : : +- TableSourceScan(table=[[y]], ...) : +- Exchange(distribution=[hash[a1, b1]]) : +- TableSourceScan(table=[[t1]], ...) +- Exchange(distribution=[hash[d1, e1]]) +- TableSourceScan(table=[[t2]], ...) {code} In above physical plan, the {{Exchange}}s between {{SortMergeJoin}}s are redundant due to their shuffle keys are same, the {{Sort}}s in the top tow {{SortMergeJoin}}s' left hand side are redundant due to its input is sorted. another situation is the shuffle and collation could be removed between multiple {{Over}}s. e.g. {code:sql} schema: MyTable: a int, b int, c varchar sql: SELECT COUNT(*) OVER (PARTITION BY c ORDER BY a), SUM(a) OVER (PARTITION BY b ORDER BY a), RANK() OVER (PARTITION BY c ORDER BY a, c), SUM(a) OVER (PARTITION BY b ORDER BY a), COUNT(*) OVER (PARTITION BY c ORDER BY c) FROM MyTable the physical plan after redundant Exchange and Sort are removed: Calc(select=[...]) +- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*) AS w3$o0 RANG ...]) +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w1$o0 RANG ...], window#1=[RANK(*) AS w2$o0 RANG ...], ...) +- Sort(orderBy=[c ASC, a ASC]) +- Exchange(distribution=[hash[c]]) +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w1$o1, $SUM0(a) AS w0$o0 RANG ...], ...) +- Sort(orderBy=[b ASC, a ASC]) +- Exchange(distribution=[hash[b]]) +- TableSourceScan(table=[[MyTable]], ...) {code} the {{Exchange}}s and {{Sort}} between the top two {{OverAggregate}}s are redundant due to their shuffle keys and sort keys are same. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
Re: [DISCUSS] FLIP-39: Flink ML pipeline and ML libs
We discussed this in private and came to the conclusion that we should (for now) have the dependency on flink-table-api-xxx-bridge because we need access to the collect() method, which is not yet available in the Table API. Once that is available the code can be refactored but for now we want to unblock work on this new module. We also agreed that we don’t need a direct dependency on flink-table-planner. I hope I summarised our discussion correctly. > On 17. May 2019, at 12:20, Gen Luo wrote: > > Thanks for your reply. > > For the first question, it's not strictly necessary. But I perfer not to > have a TableEnvironment argument in Estimator.fit() or > Transformer.transform(), which is not part of machine learning concept, and > may make our API not as clean and pretty as other systems do. I would like > another way other than introducing flink-table-planner to do this. If it's > impossible or severely opposed, I may make the concession to add the > argument. > > Other than that, "flink-table-api-xxx-bridge"s are still needed. A vary > common case is that an algorithm needs to guarantee that it's running under > a BatchTableEnvironment, which makes it possible to collect result each > iteration. A typical algorithm like this is ALS. By flink1.8, this can be > only achieved by converting Table to DataSet than call DataSet.collect(), > which is available in flink-table-api-xxx-bridge. Besides, registering > UDAGG is also depending on it. > > In conclusion, '"planner" can be removed from dependencies but introducing > "bridge"s are inevitable. Whether and how to acquire TableEnvironment from > a Table can be discussed.
[jira] [Created] (FLINK-12574) using sink StreamingFileSink files are overwritten when resuming application causing data loss
yitzchak lieberman created FLINK-12574: -- Summary: using sink StreamingFileSink files are overwritten when resuming application causing data loss Key: FLINK-12574 URL: https://issues.apache.org/jira/browse/FLINK-12574 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.8.0 Reporter: yitzchak lieberman when part files are saved to s3 bucket (with bucket assigner) with simple names such as: part-0-0 and part-1-2 restarting or resuming application causes checkpoint id to start from 0 and old files will be replaced by new part files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12573) ability to add suffix to part file created in Bucket (StreamingFileSink)
yitzchak lieberman created FLINK-12573: -- Summary: ability to add suffix to part file created in Bucket (StreamingFileSink) Key: FLINK-12573 URL: https://issues.apache.org/jira/browse/FLINK-12573 Project: Flink Issue Type: Improvement Reporter: yitzchak lieberman a possibility to add suffix to part file path other than: new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter); -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12572) Implement TableSource and InputFormat to read Hive tables
zjuwangg created FLINK-12572: Summary: Implement TableSource and InputFormat to read Hive tables Key: FLINK-12572 URL: https://issues.apache.org/jira/browse/FLINK-12572 Project: Flink Issue Type: Sub-task Components: Connectors / Hive, Table SQL / Ecosystem Affects Versions: 1.9.0 Reporter: zjuwangg -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12571) Make NetworkEnvironment#start() return the binded data port
zhijiang created FLINK-12571: Summary: Make NetworkEnvironment#start() return the binded data port Key: FLINK-12571 URL: https://issues.apache.org/jira/browse/FLINK-12571 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: zhijiang Assignee: zhijiang Currently `NetworkEnvironment#getConnectionManager()` is mainly used for `TaskManagerServices` for getting binded data port from `NettyConnectionManager`. Considering the `ConnectionManager` as an internal component of `NetworkEnvironment`, it should not be exposed for outsides. For other ShuffleService implementations, it might have no `ConnectionManager` at all. We could make `ShuffleService#start()` return the binded data port to replace the `getConnectionManager`. For the `LocalConnectionManager` or other shuffle service implementations which have no binded data port, it could return a simple default value and it would have no harm. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12570) Switch Task from ResultPartition to ResultPartitionWriter interface
Andrey Zagrebin created FLINK-12570: --- Summary: Switch Task from ResultPartition to ResultPartitionWriter interface Key: FLINK-12570 URL: https://issues.apache.org/jira/browse/FLINK-12570 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Andrey Zagrebin Assignee: Andrey Zagrebin Fix For: 1.9.0 This part of Shuffle API refactoring: make task not depend on the concrete implementation of ResultPartitionWriter (ResultPartition). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12569) Broken/Outdated docker/build.sh for flink 1.8.0
Ankit Chaudhary created FLINK-12569: --- Summary: Broken/Outdated docker/build.sh for flink 1.8.0 Key: FLINK-12569 URL: https://issues.apache.org/jira/browse/FLINK-12569 Project: Flink Issue Type: Bug Components: Deployment / Docker Affects Versions: 1.8.0 Reporter: Ankit Chaudhary I was reading through the release notes of Flink 1.8.0 and found out that we are no more releasing binaries for specific version of hadoop (https://issues.apache.org/jira/browse/FLINK-11266). Because of these changes the docker build script for job cluster docker image is failing when supplied CLI arguments for specific version of flink, hadoop, and scala (because no such file exists on the remote location "https://archive.apache.org/dist/flink/flink-${FLINK_VERSION}/${FLINK_DIST_FILE_NAME};). Following is the location of source : [https://github.com/apache/flink/tree/release-1.8/flink-container/docker] Following is the failure I received {code:java} Step 11/16 : RUN set -x && ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && chown -h flink:flink $FLINK_HOME ---> Running in b28403d5820b + ln -s /opt/flink-1.8.0-bin-hadoop28-scala_2.11.tgz /opt/flink + ln -s /opt/job.jar /opt/flink/lib ln: /opt/flink/lib: Not a directory The command '/bin/sh -c set -x && ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && chown -h flink:flink $FLINK_HOME' returned a non-zero code: 1 {code} Following was the command passed : {code:java} ./build.sh --job-jar --from-release --flink-version 1.8.0 --hadoop-version 2.8 --scala-version 2.11 --image-name flink-custom-image:0.0.0 {code} I think we should change the script and update the readme file to reflect the changes done via FLINK-11266. NOTE: * I am not sure if the Jira component I have selected is correct or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12568) Implement TableSink and OutputFormat to write Hive tables
Rui Li created FLINK-12568: -- Summary: Implement TableSink and OutputFormat to write Hive tables Key: FLINK-12568 URL: https://issues.apache.org/jira/browse/FLINK-12568 Project: Flink Issue Type: Sub-task Reporter: Rui Li Assignee: Rui Li -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12567) Rework DescriptorProperties to adapt unified DDL with clause and Descriptor key value pairs
Danny Chan created FLINK-12567: -- Summary: Rework DescriptorProperties to adapt unified DDL with clause and Descriptor key value pairs Key: FLINK-12567 URL: https://issues.apache.org/jira/browse/FLINK-12567 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.8.0 Reporter: Danny Chan Assignee: Danny Chan Fix For: 1.9.0 After introduce DDLs, we need to unify the k-v properties format for DDL with clause and Descriptor API, this tool class also need to rework. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12566) Remove row interval type
Timo Walther created FLINK-12566: Summary: Remove row interval type Key: FLINK-12566 URL: https://issues.apache.org/jira/browse/FLINK-12566 Project: Flink Issue Type: Sub-task Components: Table SQL / API Reporter: Timo Walther Assignee: Timo Walther The row interval type just adds additional complexity and prevents SQL queries from supporting count windows. A regular {{BIGINT}} type is sufficient to represent a count. -- This message was sent by Atlassian JIRA (v7.6.3#76005)