Re: [VOTE] Release flink-shaded 7.0, release candidate 2

2019-05-21 Thread Hequn Cheng
Thank you Jincheng for the release!

+1 (non-binding)

- Release notes are correct.
- Built from source archive successfully.
- Signatures and hash are correct.
- All artifacts(11 artifacts including flink-shaded) have been deployed to
the maven central repository.

One minor comment for the website pull request but I think it is not a
blocker.

Best, Hequn


On Mon, May 20, 2019 at 9:20 AM jincheng sun 
wrote:

> Hi everyone,
>
> Please review and vote on the release candidate #2 for the version 7.0, as
> follows:
>
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
> The complete staging area is available for your review, which includes:
> * JIRA release notes [1],
> * the official Apache source release to be deployed to dist.apache.org
> [2],
> which are signed with the key with fingerprint
> 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [3],
> * all artifacts to be deployed to the Maven Central Repository [4],
> * source code tag "release-7.0-rc2" [5],
> * website pull request listing the new release [6].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Jincheng
>
> [1]
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12345226=Html=12315522=Create_token=A5KQ-2QAV-T4JA-FDED%7C8ba061049bec0c5a72dc0191c47bb53a73b82cb4%7Clin
> [2] https://dist.apache.org/repos/dist/dev/flink/flink-shaded-7.0-rc2/
> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> [4] https://repository.apache.org/content/repositories/orgapacheflink-1218
> [5] https://github.com/apache/flink-shaded/tree/release-7.0-rc2
> [6] https://github.com/apache/flink-web/pull/209
>


[jira] [Created] (FLINK-12586) Stderr and stdout are reversed in OptimizerPlanEnvironment

2019-05-21 Thread Kazunori Shinhira (JIRA)
Kazunori Shinhira created FLINK-12586:
-

 Summary: Stderr and stdout are reversed in OptimizerPlanEnvironment
 Key: FLINK-12586
 URL: https://issues.apache.org/jira/browse/FLINK-12586
 Project: Flink
  Issue Type: Bug
  Components: Command Line Client
Affects Versions: 1.8.0, 1.7.2
Reporter: Kazunori Shinhira


In OptimizerPlanEnvironment#getOptimizedPlan method, it looks like that stdout 
is output as System.err and stderr is output as System.out.

[https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L107-L108]

 

I think, It should be like as bellow.
{code:java}
throw new ProgramInvocationException(

"The program plan could not be fetched - the program aborted pre-maturely."

+ "\n\nSystem.err: " + (stdout.length() == 0 ? "(none)" : stderr)

+ "\n\nSystem.out: " + (stderr.length() == 0 ? "(none)" : stdout));
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12585) Align Stream/BatchTableEnvironment with JAVA Table API

2019-05-21 Thread sunjincheng (JIRA)
sunjincheng created FLINK-12585:
---

 Summary: Align Stream/BatchTableEnvironment with JAVA Table API
 Key: FLINK-12585
 URL: https://issues.apache.org/jira/browse/FLINK-12585
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Affects Versions: 1.9.0
Reporter: sunjincheng


Initially we wanted to align with the 
[FLIP-32|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions]]
 plan and Unify the TableEnvironment.  such as:
{code:java}
 TableConfig config = TableConfig.builder()
.asStreamingExecution()
// example of providing configuration that was in 
StreamExecutionEnvironment before
.watermarkInterval(100)   
.build();

TableEnvironment tEnv = TableEnvironment.create(config);{code}
So, Current Python Table API as follows:
{code:java}
self.t_config = 
TableConfig.Builder().as_streaming_execution().set_parallelism(1).build()
self.t_env = TableEnvironment.create(self.t_config){code}
But, due to Java API not have done this improve yet, and the end date of 
release 1.9 is coming, So, It's better to align the 
`Stream/BatchTableEnvironment` with JAVA Table API for now. we should follow 
the current Java style, for example:
{code:java}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(getStateBackend) 
val tEnv = StreamTableEnvironment.create(env){code}
What to do you think? [~dian.fu] [~WeiZhong] 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12584) Add Bucket File Syetem Table Sink

2019-05-21 Thread zhangjun (JIRA)
zhangjun created FLINK-12584:


 Summary: Add Bucket File Syetem Table Sink
 Key: FLINK-12584
 URL: https://issues.apache.org/jira/browse/FLINK-12584
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API
Affects Versions: 1.8.0, 1.9.0
Reporter: zhangjun
Assignee: zhangjun


#  ** *Motivation*

In flink, the file system (especially hdfs) is a very common output, but for 
users using sql, it does not support directly using sql to write data to the 
file system, so I want to add a bucket file system table sink, the user can 
register it to StreamTableEnvironment, so that table api and sql api can 
directly use the sink to write stream data to filesystem
 #  ** *example*

tEnv.connect(new Bucket().basePath("hdfs://localhost/tmp/flink-data"))

   .withFormat(new Json().deriveSchema())

   .withSchema(new Schema()

  .field("name", Types. STRING ())

  .field("age", Types. INT ())

   .inAppendMode()

   .registerTableSink("myhdfssink");

tEnv.sqlUpdate("insert into myhdfssink SELECT * FROM mytablesource");

 
 #  ** *Some ideas to achieve this function*

1) Add a class Bucket which extends from ConnectorDescriptor, add some 
properties, such as basePath.

2) Add a class BucketValidator which extends from the 
ConnectorDescriptorValidator and is used to check the bucket descriptor.

3) Add a class FileSystemTableSink to implement the StreamTableSink interface.  
In the emitDataStream method, construct StreamingFileSink for writing data to 
filesystem according to different properties.

4) Add a factory class FileSystemTableSinkFactory to implement the 
StreamTableSinkFactory interface for constructing FileSystemTableSink

5) The parameters of withFormat method is the implementation classes of the 
FormatDescriptor interface, such as Json, Csv, and we can add Parquet、Orc later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


contribute to Apache Flink

2019-05-21 Thread Jason Guo
Hi,

I want to contribute to Apache Flink.
Would you please give me the contributor permission?
My Jira ID is habren


Thanks & Best Regards,
Jason


Checkpoint / Two Phase Commit

2019-05-21 Thread vijikarthi
Question regarding end-to-end exactly once guarantee implementation using
2PC? 

As I understand how it operates, the pre-phase state is when the checkpoint
is initiated and the checkpoint barrier advances from source to sink. Once
the pre-phase is complete (and successful), then the next step in the
process is where the "Sink" operator is expected to "Commit" the transaction
(the data that was part of the checkpointed state). The datasource backed by
the Sink operator is expected to commit the transaction.

Say if the transaction times out and data cannot be committed, is there an
option to roll back the checkpointed state without incurring the data loss?
As of now, it does not work this way but I am trying to understand the
challenges/limitations with respect to discarding the checkpoint in
question?

Once reason could be with respect to (what to do with) the subsequent
checkpoints that might have advanced during this scenario? Anything else?

Regards
Vijay



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/


[jira] [Created] (FLINK-12583) Add all format support align with the Java Table API

2019-05-21 Thread sunjincheng (JIRA)
sunjincheng created FLINK-12583:
---

 Summary: Add all format support align with the Java Table API
 Key: FLINK-12583
 URL: https://issues.apache.org/jira/browse/FLINK-12583
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Affects Versions: 1.9.0
Reporter: sunjincheng


 We have added the `CSV` format and base test case for the format in 
FLINK-12439, we also need to add another format, such as JSON, PARQUET, AVRO, 
etc. in Python Table API for aligning with JAVA Table API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12582) Alterations in GenericInMemoryCatalog should check existing object and new object are of the same class

2019-05-21 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12582:


 Summary: Alterations in GenericInMemoryCatalog should check 
existing object and new object are of the same class
 Key: FLINK-12582
 URL: https://issues.apache.org/jira/browse/FLINK-12582
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.9.0


Alterations in GenericInMemoryCatalog should check existing object and new 
object are of the same class. It currently doesn't, and you can alter an 
existing generic table with a new hive table.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12581) HiveCatalog.alterTable() and alterDatabase() should check the existing object and new object are of same type

2019-05-21 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12581:


 Summary: HiveCatalog.alterTable() and alterDatabase() should check 
the existing object and new object are of same type
 Key: FLINK-12581
 URL: https://issues.apache.org/jira/browse/FLINK-12581
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.9.0


HiveCatalog.alterTable() and alterDatabase() should check the existing object 
and new object are of same type. It currently doesn't, and you can alter an 
existing generic table with a new hive table.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12580) Rename GenericHiveMetastoreCatalogTest to HiveCatalogFlinkMetadataTest, and HiveCatalogTest to HiveCatalogHiveMetadataTest

2019-05-21 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12580:


 Summary: Rename GenericHiveMetastoreCatalogTest to 
HiveCatalogFlinkMetadataTest, and HiveCatalogTest to HiveCatalogHiveMetadataTest
 Key: FLINK-12580
 URL: https://issues.apache.org/jira/browse/FLINK-12580
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.9.0


Rename GenericHiveMetastoreCatalogTest to HiveCatalogFlinkMetadataTest, and 
HiveCatalogTest to HiveCatalogHiveMetadataTest, since we unified 
GenericHiveMetastoreCatalog and HiveCatalog into a new HiveCatalog



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12579) Influx Metric exporter throws a lot of negative infinity errors

2019-05-21 Thread Theo Diefenthal (JIRA)
Theo Diefenthal created FLINK-12579:
---

 Summary: Influx Metric exporter throws a lot of negative infinity 
errors
 Key: FLINK-12579
 URL: https://issues.apache.org/jira/browse/FLINK-12579
 Project: Flink
  Issue Type: Bug
Reporter: Theo Diefenthal


When using the InfluxDB metrics, the logs are polluted by lots of warnings due 
to negative infinity values which influxDB can't handle:{{}}
2019-05-21 18:24:40,410 WARN  
org.apache.flink.runtime.metrics.MetricRegistryImpl   - Error while 
reporting metrics
org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException$UnableToParseException:
 partial write: unable to parse 
'taskmanager_job_task_operator_sync-time-max,host=..,job_id=d7306cf5af0cf386a9259845d2a79f7c,job_name=..,operator_id=0be27db0efe2375e2766b48026cbee26,operator_name=Source:\
 
..._kafka_source_...,subtask_index=0,task_attempt_id=bbff973b4e71d377745f0f2e3bf884ef,task_attempt_num=0,task_id=0be27db0efe2375e2766b48026cbee26,task_name=Source:\
 ..._kafka_source_PROXYLOGS\ ->\ ..\ ->\ 
...,tm_id=container_e101_1557348638026_49850_01_02 value=-∞ 
155846308013300': invalid number
unable to parse 
'taskmanager_job_task_operator_KafkaConsumer_join-time-max,host=...,job_id=d7306cf5af0cf386a9259845d2a79f7c,job_name=...,operator_id=0be27db0efe2375e2766b48026cbee26,operator_name=Source:\
 
..._kafka_source_...,subtask_index=0,task_attempt_id=bbff973b4e71d377745f0f2e3bf884ef,task_attempt_num=0,task_id=0be27db0efe2375e2766b48026cbee26,task_name=Source:\
 ..._kafka_source_...,tm_id=container_e101_1557348638026_49850_01_02 
value=-∞ 155846308013300': invalid number

unable to parse 
'taskmanager_job_task_operator_KafkaConsumer_heartbeat-response-time-max,host=...,job_id=d7306cf5af0cf386a9259845d2a79f7c,job_name=...,operator_id=0be27db0efe2375e2766b48026cbee26,operator_name=Source:\
 
.._kafka_source_..,subtask_index=0,task_attempt_id=bbff973b4e71d377745f0f2e3bf884ef,task_attempt_num=0,task_id=0be27db0efe2375e2766b48026cbee26,task_name=Source:\
 .._kafka_source_...,tm_id=container_e101_1557348638026_49850_01_02 
value=-∞ 155846308013300': invalid number dropped=0
at 
org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException.buildExceptionFromErrorMessage(InfluxDBException.java:147)
at 
org.apache.flink.metrics.influxdb.shaded.org.influxdb.InfluxDBException.buildExceptionForErrorState(InfluxDBException.java:173)
at 
org.apache.flink.metrics.influxdb.shaded.org.influxdb.impl.InfluxDBImpl.execute(InfluxDBImpl.java:796)
at 
org.apache.flink.metrics.influxdb.shaded.org.influxdb.impl.InfluxDBImpl.write(InfluxDBImpl.java:455)
at 
org.apache.flink.metrics.influxdb.InfluxdbReporter.report(InfluxdbReporter.java:97)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:430)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12578) Use secure URLs for Maven repositories

2019-05-21 Thread Jungtaek Lim (JIRA)
Jungtaek Lim created FLINK-12578:


 Summary: Use secure URLs for Maven repositories
 Key: FLINK-12578
 URL: https://issues.apache.org/jira/browse/FLINK-12578
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: Jungtaek Lim
Assignee: Jungtaek Lim


Currently, some of repository URLs in Maven pom.xml are http scheme. Ideally 
they should have been https scheme.

Below is the list of repositories which use http scheme in pom files for now:
 * Confluent
 * HWX
 * MapR



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12577) Flink uses Kryo serializer if given a SpecificRecord type

2019-05-21 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-12577:
---

 Summary: Flink uses Kryo serializer if given a SpecificRecord type
 Key: FLINK-12577
 URL: https://issues.apache.org/jira/browse/FLINK-12577
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System
Affects Versions: 1.8.0, 1.7.2, 1.9.0
Reporter: Nico Kruber


If a data stream does not return a *subclass* of {{SpecificRecord}}, Flink will 
actually infer a {{GenericTypeInfo}} and use kryo instead.

Example:
{code}
StreamExecutionEnvironment env = ...
env.getConfig().disableGenericTypes();

DataStream sourceStream = env.addSource(...);
DataStream test = sourceStream.map(value -> new 
AggregatedSensorStatistics());

public class AggregatedSensorStatistics extends 
org.apache.avro.specific.SpecificRecordBase implements 
org.apache.avro.specific.SpecificRecord {...}
{code}

This will lead to
{code}
Exception in thread "main" java.lang.UnsupportedOperationException: Generic 
types have been disabled in the ExecutionConfig and type 
org.apache.avro.specific.SpecificRecord is treated as a generic type.
at 
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at 
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:208)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:541)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:166)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:132)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:124)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1537)
at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:89)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
at 
com.ververica.training.statemigration.StateMigrationJobBase.createAndExecuteJob(StateMigrationJobBase.java:68)
at 
com.ververica.training.statemigration.avro.StateMigrationJob.main(StateMigrationJob.java:13)
{code}

You may want some flexibility in your types and thus not provide the exact one 
like {{AggregatedSensorStatistics}} in this example. I don't see any reason we 
should disallow that behaviour.

Reason for this is that {{TypeExtractor#privateGetForClass()}} is having this 
code which only sees classes as Avro if they *extend* from {{SpecificRecord}}:
{code}
if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
return AvroUtils.getAvroUtils().createAvroTypeInfo(clazz);
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


contribute to Apache Flink

2019-05-21 Thread luji...@126.com

Hi,

I want to contribute to Apache Flink.
Would you please give me the contributor permission?
My JIRA ID is lujisen.



祝工作顺利,身体健康!
*
陆继森
手 机:18560768631
E-Mail:luj...@126.com
*


Re: [DISCUSS] FLIP-39: Flink ML pipeline and ML libs

2019-05-21 Thread Gen Luo
Yes, this is our conclusion. I'd like to add only one point that
registering user defined aggregator is also needed which is currently
provided by 'bridge' and finally will be merged into Table API. It's same
with collect().

I will add a TableEnvironment argument in Estimator.fit() and
Transformer.transform() to get rid of the dependency on
flink-table-planner. This will be committed soon.

Aljoscha Krettek  于2019年5月21日周二 下午7:31写道:

> We discussed this in private and came to the conclusion that we should
> (for now) have the dependency on flink-table-api-xxx-bridge because we need
> access to the collect() method, which is not yet available in the Table
> API. Once that is available the code can be refactored but for now we want
> to unblock work on this new module.
>
> We also agreed that we don’t need a direct dependency on
> flink-table-planner.
>
> I hope I summarised our discussion correctly.
>
> > On 17. May 2019, at 12:20, Gen Luo  wrote:
> >
> > Thanks for your reply.
> >
> > For the first question, it's not strictly necessary. But I perfer not to
> > have a TableEnvironment argument in Estimator.fit() or
> > Transformer.transform(), which is not part of machine learning concept,
> and
> > may make our API not as clean and pretty as other systems do. I would
> like
> > another way other than introducing flink-table-planner to do this. If
> it's
> > impossible or severely opposed, I may make the concession to add the
> > argument.
> >
> > Other than that, "flink-table-api-xxx-bridge"s are still needed. A vary
> > common case is that an algorithm needs to guarantee that it's running
> under
> > a BatchTableEnvironment, which makes it possible to collect result each
> > iteration. A typical algorithm like this is ALS. By flink1.8, this can be
> > only achieved by converting Table to DataSet than call DataSet.collect(),
> > which is available in flink-table-api-xxx-bridge. Besides, registering
> > UDAGG is also depending on it.
> >
> > In conclusion, '"planner" can be removed from dependencies but
> introducing
> > "bridge"s are inevitable. Whether and how to acquire TableEnvironment
> from
> > a Table can be discussed.
>
>


[jira] [Created] (FLINK-12576) inputQueueLength metric does not work for LocalInputChannels

2019-05-21 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-12576:
--

 Summary: inputQueueLength metric does not work for 
LocalInputChannels
 Key: FLINK-12576
 URL: https://issues.apache.org/jira/browse/FLINK-12576
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Metrics
Affects Versions: 1.8.0, 1.7.2, 1.6.4
Reporter: Piotr Nowojski


Currently {{inputQueueLength}} ignores LocalInputChannels 
({{SingleInputGate#getNumberOfQueuedBuffers}}). This can can cause mistakes 
when looking for causes of back pressure (If task is back pressuring whole 
Flink job, but there is a data skew and only local input channels are being 
used).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12575) Introduce planner rules to remove redundant shuffle and collation

2019-05-21 Thread godfrey he (JIRA)
godfrey he created FLINK-12575:
--

 Summary: Introduce planner rules to remove redundant shuffle and 
collation
 Key: FLINK-12575
 URL: https://issues.apache.org/jira/browse/FLINK-12575
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he


{{Exchange}} and {{Sort}} is the most heavy operator, they are created in 
{{FlinkExpandConversionRule}} when some operators require its inputs to satisfy 
distribution trait or collation trait in planner rules. However, many operators 
could provide distribution or collation, e.g. {{BatchExecHashAggregate}} or 
{{BatchExecHashJoin}} could provide distribution on its shuffle keys, 
{{BatchExecSortMergeJoin}} could provide distribution and collation on its join 
keys. If the provided traits could satisfy the required traits, the 
{{Exchange}} or the {{Sort}} is redundant.
e.g. 
{code:sql}
schema:
x: a int, b bigint, c varchar
y: d int, e bigint, f varchar
t1: a1 int, b1 bigint, c1 varchar
t2: d1 int, e1 bigint, f1 varchar

sql:
select * from x join y on a = d and b = e join t1 on d = a1 and e = b1 left 
outer join t2 on a1 = d1 and b1 = e1

the physical plan after redundant Exchange and Sort are removed:
SortMergeJoin(joinType=[LeftOuterJoin], where=[AND(=(a1, d1), =(b1, e1))], 
leftSorted=[true], ...)
:- SortMergeJoin(joinType=[InnerJoin], where=[AND(=(d, a1), =(e, b1))], 
leftSorted=[true], ...)
:  :- SortMergeJoin(joinType=[InnerJoin], where=[AND(=(a, d), =(b, e))], ...)
:  :  :- Exchange(distribution=[hash[a, b]])
:  :  :  +- TableSourceScan(table=[[x]], ...)
:  :  +- Exchange(distribution=[hash[d, e]])
:  : +- TableSourceScan(table=[[y]], ...)
:  +- Exchange(distribution=[hash[a1, b1]])
: +- TableSourceScan(table=[[t1]], ...)
+- Exchange(distribution=[hash[d1, e1]])
   +- TableSourceScan(table=[[t2]], ...)
{code}

In above physical plan, the {{Exchange}}s between {{SortMergeJoin}}s are 
redundant due to their shuffle keys are same, the {{Sort}}s in the top tow 
{{SortMergeJoin}}s' left hand side are redundant due to its input is sorted.

another situation is the shuffle and collation could be removed between 
multiple {{Over}}s. e.g.
{code:sql}
schema:
MyTable: a int, b int, c varchar

sql:
SELECT
COUNT(*) OVER (PARTITION BY c ORDER BY a),
SUM(a) OVER (PARTITION BY b ORDER BY a),
RANK() OVER (PARTITION BY c ORDER BY a, c),
SUM(a) OVER (PARTITION BY b ORDER BY a),
COUNT(*) OVER (PARTITION BY c ORDER BY c)
 FROM MyTable

the physical plan after redundant Exchange and Sort are removed:
Calc(select=[...])
+- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*) AS w3$o0 
RANG ...])
   +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS 
w1$o0 RANG ...], window#1=[RANK(*) AS w2$o0 RANG ...], ...)
  +- Sort(orderBy=[c ASC, a ASC])
 +- Exchange(distribution=[hash[c]])
+- OverAggregate(partitionBy=[b], orderBy=[a ASC], 
window#0=[COUNT(a) AS w1$o1, $SUM0(a) AS w0$o0 RANG ...], ...)
   +- Sort(orderBy=[b ASC, a ASC])
  +- Exchange(distribution=[hash[b]])
 +- TableSourceScan(table=[[MyTable]], ...)
{code}
the {{Exchange}}s and {{Sort}} between the top two {{OverAggregate}}s are 
redundant due to their shuffle keys and sort keys are same.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] FLIP-39: Flink ML pipeline and ML libs

2019-05-21 Thread Aljoscha Krettek
We discussed this in private and came to the conclusion that we should (for 
now) have the dependency on flink-table-api-xxx-bridge because we need access 
to the collect() method, which is not yet available in the Table API. Once that 
is available the code can be refactored but for now we want to unblock work on 
this new module.

We also agreed that we don’t need a direct dependency on flink-table-planner.

I hope I summarised our discussion correctly.

> On 17. May 2019, at 12:20, Gen Luo  wrote:
> 
> Thanks for your reply.
> 
> For the first question, it's not strictly necessary. But I perfer not to
> have a TableEnvironment argument in Estimator.fit() or
> Transformer.transform(), which is not part of machine learning concept, and
> may make our API not as clean and pretty as other systems do. I would like
> another way other than introducing flink-table-planner to do this. If it's
> impossible or severely opposed, I may make the concession to add the
> argument.
> 
> Other than that, "flink-table-api-xxx-bridge"s are still needed. A vary
> common case is that an algorithm needs to guarantee that it's running under
> a BatchTableEnvironment, which makes it possible to collect result each
> iteration. A typical algorithm like this is ALS. By flink1.8, this can be
> only achieved by converting Table to DataSet than call DataSet.collect(),
> which is available in flink-table-api-xxx-bridge. Besides, registering
> UDAGG is also depending on it.
> 
> In conclusion, '"planner" can be removed from dependencies but introducing
> "bridge"s are inevitable. Whether and how to acquire TableEnvironment from
> a Table can be discussed.



[jira] [Created] (FLINK-12574) using sink StreamingFileSink files are overwritten when resuming application causing data loss

2019-05-21 Thread yitzchak lieberman (JIRA)
yitzchak lieberman created FLINK-12574:
--

 Summary: using sink StreamingFileSink files are overwritten when 
resuming application causing data loss
 Key: FLINK-12574
 URL: https://issues.apache.org/jira/browse/FLINK-12574
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.8.0
Reporter: yitzchak lieberman


when part files are saved to s3 bucket (with bucket assigner) with simple names 
such as:

part-0-0 and part-1-2

restarting or resuming application causes checkpoint id to start from 0 and old 
files will be replaced by new part files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12573) ability to add suffix to part file created in Bucket (StreamingFileSink)

2019-05-21 Thread yitzchak lieberman (JIRA)
yitzchak lieberman created FLINK-12573:
--

 Summary: ability to add suffix to part file created in Bucket 
(StreamingFileSink)
 Key: FLINK-12573
 URL: https://issues.apache.org/jira/browse/FLINK-12573
 Project: Flink
  Issue Type: Improvement
Reporter: yitzchak lieberman


a possibility to add suffix to part file path other than:

new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12572) Implement TableSource and InputFormat to read Hive tables

2019-05-21 Thread zjuwangg (JIRA)
zjuwangg created FLINK-12572:


 Summary: Implement TableSource and InputFormat to read Hive tables
 Key: FLINK-12572
 URL: https://issues.apache.org/jira/browse/FLINK-12572
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive, Table SQL / Ecosystem
Affects Versions: 1.9.0
Reporter: zjuwangg






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12571) Make NetworkEnvironment#start() return the binded data port

2019-05-21 Thread zhijiang (JIRA)
zhijiang created FLINK-12571:


 Summary: Make NetworkEnvironment#start() return the binded data 
port
 Key: FLINK-12571
 URL: https://issues.apache.org/jira/browse/FLINK-12571
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang


Currently `NetworkEnvironment#getConnectionManager()` is mainly used for 
`TaskManagerServices` for getting binded data port from 
`NettyConnectionManager`. Considering the `ConnectionManager` as an internal 
component of `NetworkEnvironment`, it should not be exposed for outsides. For 
other ShuffleService implementations, it might have no `ConnectionManager` at 
all.

We could make `ShuffleService#start()` return the binded data port to replace 
the `getConnectionManager`. For the `LocalConnectionManager` or other shuffle 
service implementations which have no binded data port, it could return a 
simple default value and it would have no harm.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12570) Switch Task from ResultPartition to ResultPartitionWriter interface

2019-05-21 Thread Andrey Zagrebin (JIRA)
Andrey Zagrebin created FLINK-12570:
---

 Summary: Switch Task from ResultPartition to ResultPartitionWriter 
interface
 Key: FLINK-12570
 URL: https://issues.apache.org/jira/browse/FLINK-12570
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Andrey Zagrebin
Assignee: Andrey Zagrebin
 Fix For: 1.9.0


This part of Shuffle API refactoring: make task not depend on the concrete 
implementation of ResultPartitionWriter (ResultPartition).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12569) Broken/Outdated docker/build.sh for flink 1.8.0

2019-05-21 Thread Ankit Chaudhary (JIRA)
Ankit Chaudhary created FLINK-12569:
---

 Summary: Broken/Outdated docker/build.sh for flink 1.8.0
 Key: FLINK-12569
 URL: https://issues.apache.org/jira/browse/FLINK-12569
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Docker
Affects Versions: 1.8.0
Reporter: Ankit Chaudhary


I was reading through the release notes of Flink 1.8.0 and found out that we 
are no more releasing binaries for specific version of hadoop 
(https://issues.apache.org/jira/browse/FLINK-11266).

Because of these changes the docker build script for job cluster docker image 
is failing when supplied CLI arguments for specific version of flink, hadoop, 
and scala (because no such file exists on the remote location 
"https://archive.apache.org/dist/flink/flink-${FLINK_VERSION}/${FLINK_DIST_FILE_NAME};).

Following is the location of source : 
[https://github.com/apache/flink/tree/release-1.8/flink-container/docker]

Following is the failure I received 
{code:java}
Step 11/16 : RUN set -x && ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && ln 
-s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && addgroup -S flink && adduser 
-D -S -H -G flink -h $FLINK_HOME flink && chown -R flink:flink 
$FLINK_INSTALL_PATH/flink-* && chown -h flink:flink $FLINK_HOME
---> Running in b28403d5820b
+ ln -s /opt/flink-1.8.0-bin-hadoop28-scala_2.11.tgz /opt/flink
+ ln -s /opt/job.jar /opt/flink/lib
ln: /opt/flink/lib: Not a directory
The command '/bin/sh -c set -x && ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME 
&& ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && addgroup -S flink && 
adduser -D -S -H -G flink -h $FLINK_HOME flink && chown -R flink:flink 
$FLINK_INSTALL_PATH/flink-* && chown -h flink:flink $FLINK_HOME' returned a 
non-zero code: 1
{code}
Following was the command passed :
{code:java}
./build.sh --job-jar  --from-release --flink-version 1.8.0 
--hadoop-version 2.8 --scala-version 2.11 --image-name flink-custom-image:0.0.0

{code}
 

I think we should change the script and update the readme file to reflect the 
changes done via FLINK-11266.

 

NOTE:
 * I am not sure if the Jira component I have selected is correct or not.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12568) Implement TableSink and OutputFormat to write Hive tables

2019-05-21 Thread Rui Li (JIRA)
Rui Li created FLINK-12568:
--

 Summary: Implement TableSink and OutputFormat to write Hive tables
 Key: FLINK-12568
 URL: https://issues.apache.org/jira/browse/FLINK-12568
 Project: Flink
  Issue Type: Sub-task
Reporter: Rui Li
Assignee: Rui Li






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12567) Rework DescriptorProperties to adapt unified DDL with clause and Descriptor key value pairs

2019-05-21 Thread Danny Chan (JIRA)
Danny Chan created FLINK-12567:
--

 Summary: Rework DescriptorProperties to adapt unified DDL with 
clause and Descriptor key value pairs
 Key: FLINK-12567
 URL: https://issues.apache.org/jira/browse/FLINK-12567
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.8.0
Reporter: Danny Chan
Assignee: Danny Chan
 Fix For: 1.9.0


After introduce DDLs, we need to unify the k-v properties format for DDL with 
clause and Descriptor API, this tool class also need to rework.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12566) Remove row interval type

2019-05-21 Thread Timo Walther (JIRA)
Timo Walther created FLINK-12566:


 Summary: Remove row interval type
 Key: FLINK-12566
 URL: https://issues.apache.org/jira/browse/FLINK-12566
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


The row interval type just adds additional complexity and prevents SQL queries 
from supporting count windows. A regular {{BIGINT}} type is sufficient to 
represent a count.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)