[jira] [Created] (FLINK-35208) Respect pipeline.cached-files during processing Python dependencies

2024-04-22 Thread Dian Fu (Jira)
Dian Fu created FLINK-35208:
---

 Summary: Respect pipeline.cached-files during processing Python 
dependencies
 Key: FLINK-35208
 URL: https://issues.apache.org/jira/browse/FLINK-35208
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu


Currently, PyFlink will make use of distributed cache (update 
StreamExecutionEnvironment#cachedFiles) during handling the Python 
dependencies(See 
[https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java#L339]
 for more details). 

However, if pipeline.cached-files is configured, it will clear 
StreamExecutionEnvironment#cachedFiles(see 
[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1132]
 for more details) which may break the above functionalities.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34985) It doesn't support to access fields by name for map function in thread mode

2024-04-01 Thread Dian Fu (Jira)
Dian Fu created FLINK-34985:
---

 Summary: It doesn't support to access fields by name for map 
function in thread mode
 Key: FLINK-34985
 URL: https://issues.apache.org/jira/browse/FLINK-34985
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Dian Fu


Reported in slack channel: 
[https://apache-flink.slack.com/archives/C065944F9M2/p1711640068929589]

```
hi all, I seem to be running into an issue when switching to thread mode in 
PyFlink. In an UDF the {{Row}} seems to get converted into a tuple and you 
cannot access fields by their name anymore. In process mode it works fine. This 
bug can easily be reproduced using this minimal example, which is close to the 
PyFlink docs:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Row
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set("parallelism.default", "1")


# This does work:
t_env.get_config().set("python.execution-mode", "process")

# This doesn't work:
#t_env.get_config().set("python.execution-mode", "thread")


def map_function(a: Row) -> Row:
return Row(a.a + 1, a.b * a.b)


# map operation with a python general scalar function
func = udf(
map_function,
result_type=DataTypes.ROW(
[
DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
]
),
)
table = (
t_env.from_elements(
[(2, 4), (0, 0)],
schema=DataTypes.ROW(
[
DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
]
),
)
.map(func)
.alias("a", "b")
.execute()
.print()
)```
 
The exception I get in this execution mode is:
2024-03-28 16:32:10 Caused by: pemja.core.PythonException: : 'tuple' object has no attribute 'a'
2024-03-28 16:32:10 at 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72)
2024-03-28 16:32:10 at 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/table/operations.process_element(operations.py:102)
2024-03-28 16:32:10 at .(:1)
2024-03-28 16:32:10 at 
/opt/flink/wouter/minimal_example.map_function(minimal_example.py:19)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32989) PyFlink wheel package build failed

2023-08-29 Thread Dian Fu (Jira)
Dian Fu created FLINK-32989:
---

 Summary: PyFlink wheel package build failed
 Key: FLINK-32989
 URL: https://issues.apache.org/jira/browse/FLINK-32989
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.19.0
Reporter: Dian Fu


{code}
Compiling pyflink/fn_execution/coder_impl_fast.pyx because it changed. 
Compiling pyflink/fn_execution/table/aggregate_fast.pyx because it changed. 
Compiling pyflink/fn_execution/table/window_aggregate_fast.pyx because it 
changed. 
Compiling pyflink/fn_execution/stream_fast.pyx because it changed. 
Compiling pyflink/fn_execution/beam/beam_stream_fast.pyx because it changed. 
Compiling pyflink/fn_execution/beam/beam_coder_impl_fast.pyx because it 
changed. 
Compiling pyflink/fn_execution/beam/beam_operations_fast.pyx because it 
changed. 
[1/7] Cythonizing pyflink/fn_execution/beam/beam_coder_impl_fast.pyx 
[2/7] Cythonizing pyflink/fn_execution/beam/beam_operations_fast.pyx 
[3/7] Cythonizing pyflink/fn_execution/beam/beam_stream_fast.pyx 
[4/7] Cythonizing pyflink/fn_execution/coder_impl_fast.pyx 
[5/7] Cythonizing pyflink/fn_execution/stream_fast.pyx 
[6/7] Cythonizing pyflink/fn_execution/table/aggregate_fast.pyx 
[7/7] Cythonizing pyflink/fn_execution/table/window_aggregate_fast.pyx 
/home/vsts/work/1/s/flink-python/dev/.conda/envs/3.7/lib/python3.7/site-packages/Cython/Compiler/Main.py:369:
 FutureWarning: Cython directive 'language_level' not set, using 2 for now 
(Py2). This will change in a later release! File: 
/home/vsts/work/1/s/flink-python/pyflink/fn_execution/table/window_aggregate_fast.pxd
 
 tree = Parsing.p_module(s, pxd, full_module_name) 
Exactly one Flink home directory must exist, but found: []
{code}

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=52740=logs=d15e2b2e-10cd-5f59-7734-42d57dc5564d=4a86776f-e6e1-598a-f75a-c43d8b819662



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32724) Mark CEP API classes as Public / PublicEvolving

2023-08-01 Thread Dian Fu (Jira)
Dian Fu created FLINK-32724:
---

 Summary: Mark CEP API classes as Public / PublicEvolving
 Key: FLINK-32724
 URL: https://issues.apache.org/jira/browse/FLINK-32724
 Project: Flink
  Issue Type: Improvement
  Components: Library / CEP
Reporter: Dian Fu


Currently most CEP API classes, e.g. Pattern, PatternSelectFunction etc are not 
annotated as Public / PublicEvolving. We should improve this to make it clear 
which classes are public.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32131) Support specifying hadoop config dir for Python HiveCatalog

2023-05-18 Thread Dian Fu (Jira)
Dian Fu created FLINK-32131:
---

 Summary: Support specifying hadoop config dir for Python 
HiveCatalog
 Key: FLINK-32131
 URL: https://issues.apache.org/jira/browse/FLINK-32131
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.18.0


Hadoop config directory could be specified for HiveCatalog in Java, however, 
this is still not supported in Python HiveCatalog. This JIRA is to align them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31949) The state of CountTumblingWindowAssigner of Python DataStream API were never purged

2023-04-26 Thread Dian Fu (Jira)
Dian Fu created FLINK-31949:
---

 Summary: The state of CountTumblingWindowAssigner of Python 
DataStream API were never purged
 Key: FLINK-31949
 URL: https://issues.apache.org/jira/browse/FLINK-31949
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Dian Fu


Posted By Urs from user-mailing list:
{code:java}
"In FLINK-26444, a couple of convenience window assigners were added to
the Python Datastream API, including CountTumblingWindowAssigner. This
assigner uses a CountTrigger by default, which produces TriggerResult.FIRE.

As such, using this window assigner on a data stream will always produce
a "state leak" since older count windows will always be retained without
any chance to work on the elements again."
{code}
See [https://lists.apache.org/thread/ql8x283xzgd98z0vsqr9npl5j74hscsm] for more 
details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31905) Exception thrown when accessing nested field of the result of Python UDF with complex type

2023-04-24 Thread Dian Fu (Jira)
Dian Fu created FLINK-31905:
---

 Summary: Exception thrown when accessing nested field of the 
result of Python UDF with complex type
 Key: FLINK-31905
 URL: https://issues.apache.org/jira/browse/FLINK-31905
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Dian Fu


For the following job:
{code}
import logging, sys

from pyflink.common import Row
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import Schema, DataTypes, TableDescriptor, 
StreamTableEnvironment
from pyflink.table.expressions import col, row
from pyflink.table.udf import ACC, T, udaf, AggregateFunction, udf

logging.basicConfig(stream=sys.stdout, level=logging.ERROR, 
format="%(message)s")


class EmitLastState(AggregateFunction):
"""
Aggregator that emits the latest state for the purpose of
enabling parallelism on CDC tables.
"""

def create_accumulator(self) -> ACC:
return Row(None, None)

def accumulate(self, accumulator: ACC, *args):
key, obj = args
if (accumulator[0] is None) or (key > accumulator[0]):
accumulator[0] = key
accumulator[1] = obj

def retract(self, accumulator: ACC, *args):
pass

def get_value(self, accumulator: ACC) -> T:
return accumulator[1]


some_complex_inner_type = DataTypes.ROW(
[
DataTypes.FIELD("f0", DataTypes.STRING()),
DataTypes.FIELD("f1", DataTypes.STRING())
]
)

some_complex_type = DataTypes.ROW(
[
DataTypes.FIELD(k, DataTypes.ARRAY(some_complex_inner_type))
for k in ("f0", "f1", "f2")
]
+ [
DataTypes.FIELD("f3", DataTypes.DATE()),
DataTypes.FIELD("f4", DataTypes.VARCHAR(32)),
DataTypes.FIELD("f5", DataTypes.VARCHAR(2)),
]
)

@udf(input_types=DataTypes.STRING(), result_type=some_complex_type)
def complex_udf(s):
return Row(f0=None, f1=None, f2=None, f3=None, f4=None, f5=None)


if __name__ == "__main__":
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
table_env.get_config().set('pipeline.classpaths', 
'file:///Users/dianfu/code/src/workspace/pyflink-examples/flink-sql-connector-postgres-cdc-2.1.1.jar')

# Create schema
_schema = {
"p_key": DataTypes.INT(False),
"modified_id": DataTypes.INT(False),
"content": DataTypes.STRING()
}
schema = Schema.new_builder().from_fields(
*zip(*[(k, v) for k, v in _schema.items()])
).\
primary_key("p_key").\
build()

# Create table descriptor
descriptor = TableDescriptor.for_connector("postgres-cdc").\
option("hostname", "host.docker.internal").\
option("port", "5432").\
option("database-name", "flink_issue").\
option("username", "root").\
option("password", "root").\
option("debezium.plugin.name", "pgoutput").\
option("schema-name", "flink_schema").\
option("table-name", "flink_table").\
option("slot.name", "flink_slot").\
schema(schema).\
build()

table_env.create_temporary_table("flink_table", descriptor)

# Create changelog stream
stream = table_env.from_path("flink_table")\

# Define UDAF
accumulator_type = DataTypes.ROW(
[
DataTypes.FIELD("f0", DataTypes.INT(False)),
DataTypes.FIELD("f1", DataTypes.ROW([DataTypes.FIELD(k, v) for k, v 
in _schema.items()])),
]
)
result_type = DataTypes.ROW([DataTypes.FIELD(k, v) for k, v in 
_schema.items()])
emit_last = udaf(EmitLastState(), accumulator_type=accumulator_type, 
result_type=result_type)

# Emit last state based on modified_id to enable parallel processing
stream = stream.\
group_by(col("p_key")).\
select(
col("p_key"),
emit_last(col("modified_id"),row(*(col(k) for k in 
_schema.keys())).cast(result_type)).alias("tmp_obj")
)

# Select the elements of the objects
stream = stream.select(*(col("tmp_obj").get(k).alias(k) for k in 
_schema.keys()))

# We apply a UDF which parses the xml and returns a complex nested structure
stream = stream.select(col("p_key"), 
complex_udf(col("content")).alias("nested_obj"))

# We select an element from the nested structure in order to flatten it
# The next line is the line causing issues, commenting the next line will 
make the pipeline work
stream = stream.select(col("p_key"), col("nested_obj").get("f0"))

# Interestingly, the below part does work...
# stream = stream.select(col("nested_obj").get("f0"))

table_env.to_changelog_stream(stream).print()

# Execute
env.execute_async()
{code}

{code}
py4j.protocol.Py4JJavaError: An error occurred while calling 
o8.toChangelogStream.
: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1
at 

[jira] [Created] (FLINK-31861) Introduce ByteArraySchema which serialize/deserialize data of type byte array

2023-04-19 Thread Dian Fu (Jira)
Dian Fu created FLINK-31861:
---

 Summary: Introduce ByteArraySchema which serialize/deserialize 
data of type byte array
 Key: FLINK-31861
 URL: https://issues.apache.org/jira/browse/FLINK-31861
 Project: Flink
  Issue Type: Improvement
  Components: API / Core
Reporter: Dian Fu


The aim of this ticket is to introduce ByteArraySchema which 
serialize/deserialize data of type byte array. In this case, users could get 
the raw bytes from a data source. See 
[https://apache-flink.slack.com/archives/C03G7LJTS2G/p1681928862762699] for 
more details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31707) Constant string cannot be used as input arguments of Pandas UDAF

2023-04-03 Thread Dian Fu (Jira)
Dian Fu created FLINK-31707:
---

 Summary: Constant string cannot be used as input arguments of 
Pandas UDAF
 Key: FLINK-31707
 URL: https://issues.apache.org/jira/browse/FLINK-31707
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu


It will throw exceptions as following when using constant strings in Pandas 
UDAF:
{code}
E   raise ValueError("field_type %s is not supported." % 
field_type)
E   ValueError: field_type type_name: CHAR
E   char_info {
E length: 3
E   }
Eis not supported.
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31690) The current key is not set for KeyedCoProcessOperator

2023-04-02 Thread Dian Fu (Jira)
Dian Fu created FLINK-31690:
---

 Summary: The current key is not set for KeyedCoProcessOperator
 Key: FLINK-31690
 URL: https://issues.apache.org/jira/browse/FLINK-31690
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu


See https://apache-flink.slack.com/archives/C03G7LJTS2G/p1680294701254239 for 
more details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31532) Support StreamExecutionEnvironment.socketTextStream in Python DataStream API

2023-03-21 Thread Dian Fu (Jira)
Dian Fu created FLINK-31532:
---

 Summary: Support StreamExecutionEnvironment.socketTextStream in 
Python DataStream API
 Key: FLINK-31532
 URL: https://issues.apache.org/jira/browse/FLINK-31532
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu


Currently, StreamExecutionEnvironment.socketTextStream is still missing in 
Python DataStream API. It would be great to support it. It may be helpful to in 
special cases, e.g. testing, etc.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31503) "org.apache.beam.sdk.options.PipelineOptionsRegistrar: Provider org.apache.beam.sdk.options.DefaultPipelineOptionsRegistrar not a subtype" is thrown when executing Pytho

2023-03-17 Thread Dian Fu (Jira)
Dian Fu created FLINK-31503:
---

 Summary: "org.apache.beam.sdk.options.PipelineOptionsRegistrar: 
Provider org.apache.beam.sdk.options.DefaultPipelineOptionsRegistrar not a 
subtype" is thrown when executing Python UDFs in SQL Client 
 Key: FLINK-31503
 URL: https://issues.apache.org/jira/browse/FLINK-31503
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu


The following exception will be thrown when executing SQL statements containing 
Python UDFs in SQL Client:
{code}
Caused by: java.util.ServiceConfigurationError: 
org.apache.beam.sdk.options.PipelineOptionsRegistrar: Provider 
org.apache.beam.sdk.options.DefaultPipelineOptionsRegistrar not a subtype
at java.util.ServiceLoader.fail(ServiceLoader.java:239)
at java.util.ServiceLoader.access$300(ServiceLoader.java:185)
at 
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:376)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableCollection$Builder.addAll(ImmutableCollection.java:415)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet$Builder.addAll(ImmutableSet.java:507)
at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSortedSet$Builder.addAll(ImmutableSortedSet.java:528)
at 
org.apache.beam.sdk.util.common.ReflectHelpers.loadServicesOrdered(ReflectHelpers.java:199)
at 
org.apache.beam.sdk.options.PipelineOptionsFactory$Cache.initializeRegistry(PipelineOptionsFactory.java:2089)
at 
org.apache.beam.sdk.options.PipelineOptionsFactory$Cache.(PipelineOptionsFactory.java:2083)
at 
org.apache.beam.sdk.options.PipelineOptionsFactory$Cache.(PipelineOptionsFactory.java:2047)
at 
org.apache.beam.sdk.options.PipelineOptionsFactory.resetCache(PipelineOptionsFactory.java:581)
at 
org.apache.beam.sdk.options.PipelineOptionsFactory.(PipelineOptionsFactory.java:547)
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:241)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31478) TypeError: a bytes-like object is required, not 'JavaList' is thrown when ds.execute_and_collect() is called on a KeyedStream

2023-03-15 Thread Dian Fu (Jira)
Dian Fu created FLINK-31478:
---

 Summary: TypeError: a bytes-like object is required, not 
'JavaList' is thrown when ds.execute_and_collect() is called on a KeyedStream
 Key: FLINK-31478
 URL: https://issues.apache.org/jira/browse/FLINK-31478
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu


{code}

#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import logging
import sys

from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.file_system import (FileSource, 
StreamFormat, FileSink,
   OutputFileConfig, 
RollingPolicy)


word_count_data = ["To be, or not to be,--that is the question:--",
   "Whether 'tis nobler in the mind to suffer",
   "The slings and arrows of outrageous fortune",
   "Or to take arms against a sea of troubles,",
   "And by opposing end them?--To die,--to sleep,--",
   "No more; and by a sleep to say we end",
   "The heartache, and the thousand natural shocks",
   "That flesh is heir to,--'tis a consummation",
   "Devoutly to be wish'd. To die,--to sleep;--",
   "To sleep! perchance to dream:--ay, there's the rub;",
   "For in that sleep of death what dreams may come,",
   "When we have shuffled off this mortal coil,",
   "Must give us pause: there's the respect",
   "That makes calamity of so long life;",
   "For who would bear the whips and scorns of time,",
   "The oppressor's wrong, the proud man's contumely,",
   "The pangs of despis'd love, the law's delay,",
   "The insolence of office, and the spurns",
   "That patient merit of the unworthy takes,",
   "When he himself might his quietus make",
   "With a bare bodkin? who would these fardels bear,",
   "To grunt and sweat under a weary life,",
   "But that the dread of something after death,--",
   "The undiscover'd country, from whose bourn",
   "No traveller returns,--puzzles the will,",
   "And makes us rather bear those ills we have",
   "Than fly to others that we know not of?",
   "Thus conscience does make cowards of us all;",
   "And thus the native hue of resolution",
   "Is sicklied o'er with the pale cast of thought;",
   "And enterprises of great pith and moment,",
   "With this regard, their currents turn awry,",
   "And lose the name of action.--Soft you now!",
   "The fair Ophelia!--Nymph, in thy orisons",
   "Be all my sins remember'd."]


def word_count(input_path, output_path):
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
# write all the data to one file
env.set_parallelism(1)

# define the source
if input_path is not None:
ds = env.from_source(

source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
   input_path)
 .process_static_file_set().build(),
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="file_source"
)
else:
print("Executing word_count example with default input data set.")
print("Use --input to specify file input.")
ds = env.from_collection(word_count_data)

def split(line):
yield from line.split()

# 

[jira] [Created] (FLINK-31286) Python processes are still alive when shutting down a session cluster directly without stopping the jobs

2023-03-01 Thread Dian Fu (Jira)
Dian Fu created FLINK-31286:
---

 Summary: Python processes are still alive when shutting down a 
session cluster directly without stopping the jobs
 Key: FLINK-31286
 URL: https://issues.apache.org/jira/browse/FLINK-31286
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Attachments: image-2023-03-02-10-55-34-863.png

Reproduce steps:
1) start a standalone cluster: ./bin/start_cluster.sh
2) submit a PyFlink job which contains Python UDFs
3) stop the cluster: ./bin/stop_cluster.sh
4) Check if Python process still exists: ps aux | grep -i beam_boot

!image-2023-03-02-10-55-34-863.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31272) Duplicate operators appear in the StreamGraph for Python DataStream API jobs

2023-02-28 Thread Dian Fu (Jira)
Dian Fu created FLINK-31272:
---

 Summary: Duplicate operators appear in the StreamGraph for Python 
DataStream API jobs
 Key: FLINK-31272
 URL: https://issues.apache.org/jira/browse/FLINK-31272
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.16.0
Reporter: Dian Fu


For the following job:
{code}
import argparse
import json
import sys
import time
from typing import Iterable, cast

from pyflink.common import Types, Time, Encoder
from pyflink.datastream import StreamExecutionEnvironment, 
ProcessWindowFunction, EmbeddedRocksDBStateBackend, \
PredefinedOptions, FileSystemCheckpointStorage, CheckpointingMode, 
ExternalizedCheckpointCleanup
from pyflink.datastream.connectors.file_system import FileSink, RollingPolicy, 
OutputFileConfig
from pyflink.datastream.state import ReducingState, ReducingStateDescriptor
from pyflink.datastream.window import TimeWindow, Trigger, TriggerResult, T, 
TumblingProcessingTimeWindows, \
ProcessingTimeTrigger


class CountWithProcessTimeoutTrigger(ProcessingTimeTrigger):

def __init__(self, window_size: int):
self._window_size = window_size
self._count_state_descriptor = ReducingStateDescriptor(
"count", lambda a, b: a + b, Types.LONG())

@staticmethod
def of(window_size: int) -> 'CountWithProcessTimeoutTrigger':
return CountWithProcessTimeoutTrigger(window_size)

def on_element(self,
   element: T,
   timestamp: int,
   window: TimeWindow,
   ctx: 'Trigger.TriggerContext') -> TriggerResult:
count_state = cast(ReducingState, 
ctx.get_partitioned_state(self._count_state_descriptor))
count_state.add(1)
# print("element arrive:", element, "count_state:", count_state.get(), 
window.max_timestamp(),
#   ctx.get_current_watermark())

if count_state.get() >= self._window_size:  # 必须fire
print("fire element count", element, count_state.get(), 
window.max_timestamp(),
  ctx.get_current_watermark())
count_state.clear()
return TriggerResult.FIRE_AND_PURGE
if timestamp >= window.end:
count_state.clear()
return TriggerResult.FIRE_AND_PURGE
else:
return TriggerResult.CONTINUE

def on_processing_time(self,
   timestamp: int,
   window: TimeWindow,
   ctx: Trigger.TriggerContext) -> TriggerResult:
if timestamp >= window.end:
return TriggerResult.CONTINUE
else:
print("fire with process_time:", timestamp)
count_state = cast(ReducingState, 
ctx.get_partitioned_state(self._count_state_descriptor))
count_state.clear()
return TriggerResult.FIRE_AND_PURGE

def on_event_time(self,
  timestamp: int,
  window: TimeWindow,
  ctx: 'Trigger.TriggerContext') -> TriggerResult:
return TriggerResult.CONTINUE

def clear(self,
  window: TimeWindow,
  ctx: 'Trigger.TriggerContext') -> None:
count_state = ctx.get_partitioned_state(self._count_state_descriptor)
count_state.clear()


def to_dict_map(v):
time.sleep(1)
dict_value = json.loads(v)
return dict_value


def get_group_key(value, keys):
group_key_values = []
for key in keys:
one_key_value = 'null'
if key in value:
list_value = value[key]
if list_value:
one_key_value = str(list_value[0])
group_key_values.append(one_key_value)
group_key = '_'.join(group_key_values)
# print("group_key=", group_key)
return group_key


class CountWindowProcessFunction(ProcessWindowFunction[dict, dict, str, 
TimeWindow]):

def __init__(self, uf):
self._user_function = uf

def process(self,
key: str,
context: ProcessWindowFunction.Context[TimeWindow],
elements: Iterable[dict]) -> Iterable[dict]:
result_list = 
self._user_function.process_after_group_by_function(elements)
return result_list


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
dest='output',
required=False,
help='Output file to write results to.')

argv = sys.argv[1:]
known_args, _ = parser.parse_known_args(argv)
output_path = known_args.output

env = StreamExecutionEnvironment.get_execution_environment()
# write all the data to one file
env.set_parallelism(1)

# process time
env.get_config().set_auto_watermark_interval(0)
state_backend = EmbeddedRocksDBStateBackend(True)

[jira] [Created] (FLINK-31172) Support coGroup in Python DataStream API

2023-02-21 Thread Dian Fu (Jira)
Dian Fu created FLINK-31172:
---

 Summary: Support coGroup in Python DataStream API
 Key: FLINK-31172
 URL: https://issues.apache.org/jira/browse/FLINK-31172
 Project: Flink
  Issue Type: New Feature
  Components: API / Python
Reporter: Dian Fu


The aim of this ticket is support 
[coGroup|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/operators/overview/#window-cogroup]
 in Python DataStream API.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31171) Support iterate in Python DataStream API

2023-02-21 Thread Dian Fu (Jira)
Dian Fu created FLINK-31171:
---

 Summary: Support iterate in Python DataStream API
 Key: FLINK-31171
 URL: https://issues.apache.org/jira/browse/FLINK-31171
 Project: Flink
  Issue Type: New Feature
  Components: API / Python
Reporter: Dian Fu


The aim of this ticket is to support 
[iterate|https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/operators/overview/#iterate]
 in Python DataStream API.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31043) KeyError exception is thrown in CachedMapState

2023-02-13 Thread Dian Fu (Jira)
Dian Fu created FLINK-31043:
---

 Summary: KeyError exception is thrown in CachedMapState
 Key: FLINK-31043
 URL: https://issues.apache.org/jira/browse/FLINK-31043
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.15.3
Reporter: Dian Fu


Have seen the following exception in a PyFlink job which runs in Flink 1.15. It 
happens occasionally and may indicate a bug of the state cache of MapState:
{code:java}
org.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception 
while processing timer.
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1875)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1846)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:2010)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$21(StreamTask.java:1999)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1079)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1028)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:954)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:933)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
at java.lang.Thread.run(Thread.java:834)
Caused by: TimerException{java.lang.RuntimeException: Error while waiting for 
BeamPythonFunctionRunner flush}
... 14 more
Caused by: java.lang.RuntimeException: Error while waiting for 
BeamPythonFunctionRunner flush
at 
org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:106)
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:299)
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:115)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:2008)
... 13 more
Caused by: java.lang.RuntimeException: Failed to close remote bundle
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:387)
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:371)
at 
org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:85)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
... 1 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
Error received from SDK harness for instruction 131: Traceback (most recent 
call last):
  File 
"/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 289, in _execute
response = task()
  File 
"/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 362, in 
lambda: self.create_worker().do_instruction(request), request)
  File 
"/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 607, in do_instruction
getattr(request, request_type), request.instruction_id)
  File 
"/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 644, in process_bundle
bundle_processor.process_bundle(instruction_id))
  File 
"/usr/local/python3/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1000, in process_bundle

[jira] [Created] (FLINK-31031) Disable the output buffer of Python process to make it more convenient for interactive users

2023-02-12 Thread Dian Fu (Jira)
Dian Fu created FLINK-31031:
---

 Summary: Disable the output buffer of Python process to make it 
more convenient for interactive users 
 Key: FLINK-31031
 URL: https://issues.apache.org/jira/browse/FLINK-31031
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30364) Job failed without exception stack

2022-12-11 Thread Dian Fu (Jira)
Dian Fu created FLINK-30364:
---

 Summary: Job failed without exception stack
 Key: FLINK-30364
 URL: https://issues.apache.org/jira/browse/FLINK-30364
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.16.0, 1.15.0, 1.14.0, 1.13.0
Reporter: Dian Fu


{code}
2022-12-12 08:01:19,985 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - route map function 
-> Filter -> Timestamps/Watermarks with job vertex id 
4bf7c1955ffe56e2106d666433eaf137 (3/30) (96f1efb2ad67ab19cae4d81548ae58e9) 
switched from RUNNING to FAILED on 
job-ee140d6c-483b-48c3-961d-b02981d4ba99-taskmanager-1-1.
java.lang.NullPointerException: null
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30308) ClassCastException: class java.io.ObjectStreamClass$Caches$1 cannot be cast to class java.util.Map is showing in the logging when the job shutdown

2022-12-05 Thread Dian Fu (Jira)
Dian Fu created FLINK-30308:
---

 Summary: ClassCastException: class 
java.io.ObjectStreamClass$Caches$1 cannot be cast to class java.util.Map is 
showing in the logging when the job shutdown
 Key: FLINK-30308
 URL: https://issues.apache.org/jira/browse/FLINK-30308
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.17.0, 1.16.1, 1.15.4


{code:java}
2022-12-05 18:26:40,229 WARN  
org.apache.flink.streaming.api.operators.AbstractStreamOperator [] - Failed to 
clean up the leaking objects.
java.lang.ClassCastException: class java.io.ObjectStreamClass$Caches$1 cannot 
be cast to class java.util.Map (java.io.ObjectStreamClass$Caches$1 and 
java.util.Map are in module java.base of loader 'bootstrap')
at 
org.apache.flink.streaming.api.utils.ClassLeakCleaner.clearCache(ClassLeakCleaner.java:58)
 
~[blob_p-a72e14b9030c3ca0d3d0a8fc6e70166c7419d431-f7f18b2164971cb6798db9ab762feabd:1.15.0]
at 
org.apache.flink.streaming.api.utils.ClassLeakCleaner.cleanUpLeakingClasses(ClassLeakCleaner.java:39)
 
~[blob_p-a72e14b9030c3ca0d3d0a8fc6e70166c7419d431-f7f18b2164971cb6798db9ab762feabd:1.15.0]
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.close(AbstractPythonFunctionOperator.java:142)
 
~[blob_p-a72e14b9030c3ca0d3d0a8fc6e70166c7419d431-f7f18b2164971cb6798db9ab762feabd:1.15.0]
at 
org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.close(AbstractExternalPythonFunctionOperator.java:73)
 
~[blob_p-a72e14b9030c3ca0d3d0a8fc6e70166c7419d431-f7f18b2164971cb6798db9ab762feabd:1.15.0]
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:997)
 ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254) 
~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:916)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:930)
 ~[flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 [flink-dist-1.15.2.jar:1.15.2]
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:930) 
[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) 
[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) 
[flink-dist-1.15.2.jar:1.15.2]
at java.lang.Thread.run(Unknown Source) [?:?]{code}

Reported in Slack: 
https://apache-flink.slack.com/archives/C03G7LJTS2G/p1670265131083639?thread_ts=1670265114.640369=C03G7LJTS2G



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29192) Add completeness test for built-in functions between Java and Python Table API

2022-09-05 Thread Dian Fu (Jira)
Dian Fu created FLINK-29192:
---

 Summary: Add completeness test for built-in functions between Java 
and Python Table API
 Key: FLINK-29192
 URL: https://issues.apache.org/jira/browse/FLINK-29192
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.17.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28886) Support HybridSource in Python DataStream API

2022-08-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-28886:
---

 Summary: Support HybridSource in Python DataStream API
 Key: FLINK-28886
 URL: https://issues.apache.org/jira/browse/FLINK-28886
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28253) LocalDateTime is not supported in PyFlink

2022-06-26 Thread Dian Fu (Jira)
Dian Fu created FLINK-28253:
---

 Summary: LocalDateTime is not supported in PyFlink
 Key: FLINK-28253
 URL: https://issues.apache.org/jira/browse/FLINK-28253
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.0, 1.14.0
Reporter: Dian Fu


For the following job:
{code}
from pyflink.datastream.stream_execution_environment import 
StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings
from pyflink.table.table_environment import StreamTableEnvironment


if __name__ == '__main__':
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.new_instance() \
.in_streaming_mode() \
.build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env, 
environment_settings=settings)

t_env.execute_sql("""
CREATE TABLE events (
 `id` VARCHAR,
 `source` VARCHAR,
 `resources` VARCHAR,
 `time` TIMESTAMP(3),
 WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND,
 PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'filesystem',
 'path' = 'file:///path/to/input',
 'format' = 'csv'
)
""")

events_stream_table = t_env.from_path('events')

events_stream = t_env.to_data_stream(events_stream_table)
#  Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), 
Types.SQL_TIMESTAMP()])

# now do some processing - let's filter by the type of event we get

codebuild_stream = events_stream.filter(
lambda event: event['source'] == 'aws.codebuild'
)

codebuild_stream.print()
env.execute()
{code}

It will reports the following exception:
{code}
Traceback (most recent call last):
  File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/test_2.py", 
line 47, in 
process()
  File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/test_2.py", 
line 39, in process
lambda event: event['source'] == 'aws.codebuild'
  File 
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/data_stream.py",
 line 432, in filter
self._j_data_stream.getTransformation().getOutputType())
  File 
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py",
 line 1070, in _from_java_type
TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType(
  File 
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py",
 line 1042, in _from_java_type
j_row_field_types]
  File 
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py",
 line 1041, in 
row_field_types = [_from_java_type(j_row_field_type) for j_row_field_type in
  File 
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py",
 line 1072, in _from_java_type
raise TypeError("The java type info: %s is not supported in PyFlink 
currently." % j_type_info)
TypeError: The java type info: LocalDateTime is not supported in PyFlink 
currently.
{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28140) Improve the documentation by adding Python examples

2022-06-20 Thread Dian Fu (Jira)
Dian Fu created FLINK-28140:
---

 Summary: Improve the documentation by adding Python examples
 Key: FLINK-28140
 URL: https://issues.apache.org/jira/browse/FLINK-28140
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Documentation
Reporter: Dian Fu


There are still quite a few documentation only having Java/Scala examples. The 
aim of this JIRA is to improve these kinds of documentation by adding Python 
examples.

Here is a list of documentations needed to improve:
* https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/
* 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/
* 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/
* 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/temporal_table_function/
* https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/timezone/
* https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tableapi/
* https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/modules/
* 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/
* 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/
* 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/application_parameters/
* 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/execution_configuration/
* 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/
* 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/
* 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/
* https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/
* 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/formats/




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28114) The path of the Python client interpreter could not point to an archive file in distributed file system

2022-06-17 Thread Dian Fu (Jira)
Dian Fu created FLINK-28114:
---

 Summary: The path of the Python client interpreter could not point 
to an archive file in distributed file system
 Key: FLINK-28114
 URL: https://issues.apache.org/jira/browse/FLINK-28114
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.16.0, 1.15.1


See 
https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java#L178
 for more details about this limitation.

Users could execute PyFlink jobs in YARN application mode as following:
{code}
./bin/flink run-application -t yarn-application \
  -Djobmanager.memory.process.size=1024m \
  -Dtaskmanager.memory.process.size=1024m \
  -Dyarn.application.name= \
  -Dyarn.ship-files=/path/to/shipfiles \
  -pyarch shipfiles/venv.zip \
  -pyclientexec venv.zip/venv/bin/python3 \
  -pyexec venv.zip/venv/bin/python3 \
  -py shipfiles/word_count.py
{code}

In the above case, venv.zip will be distributed to the TMs via Flink blob 
server. However, blob server doesn't support files with size exceeding of 2GB. 
See 
https://github.com/apache/flink/blob/ea52732dc48a4f1c5be0925890cd8aa1ea2a11ed/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java#L223
 for more details. This is very serious problem as Python users usually tend to 
install a lot Python libraries inside the venv.zip and some Python libraries 
are very large.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28071) Support missing built-in functions in Table API

2022-06-14 Thread Dian Fu (Jira)
Dian Fu created FLINK-28071:
---

 Summary: Support missing built-in functions in Table API
 Key: FLINK-28071
 URL: https://issues.apache.org/jira/browse/FLINK-28071
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Dian Fu
 Fix For: 1.16.0


There are many built-in functions are not supported. See 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/
 for more details. There are two columns for each built-in function: *SQL 
Function* and *Table Function*, if a function is not supported in *Table API*, 
the *Table Function* column is documented as *N/A*. We need to evaluate each of 
these functions to ensure that they could be used in both SQL and Table API.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-28015) FROM_UNIXTIME could not be used in Table API

2022-06-12 Thread Dian Fu (Jira)
Dian Fu created FLINK-28015:
---

 Summary: FROM_UNIXTIME could not be used in Table API
 Key: FLINK-28015
 URL: https://issues.apache.org/jira/browse/FLINK-28015
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Reporter: Dian Fu


This issue is reported in 
[slack|https://apache-flink.slack.com/archives/C03G7LJTS2G/p1655083223954149]. 

For the following code:
{code}
input_table = table_env.from_path(input_table_name)
sliding_window_table = (
input_table.window(
Slide.over(sliding_window_over)
.every(sliding_window_every)
.on(sliding_window_on)
.alias(sliding_window_alias)
)
.group_by('ticker, {}'.format(sliding_window_alias))
.select('FROM_UNIXTIME(28*60 * (UNIX_TIMESTAMP({0}.end) / (28*60))), 
ticker, MIN(price) as min_price, MAX(price) as max_price, {0}.start as 
utc_start, {0}.end as utc_end'.format(
sliding_window_alias
))
)
{code}

The following exception will be thrown:
{code}
py4j.protocol.Py4JJavaError: An error occurred while calling o75.select.
: org.apache.flink.table.api.ValidationException: Undefined function: 
FROM_UNIXTIME
at 
org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$visit$0(LookupCallResolver.java:53)
at java.base/java.util.Optional.orElseThrow(Optional.java:408)
at 
org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:49)
at 
org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:36)
at 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:35)
at 
org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:66)
at 
org.apache.flink.table.api.internal.TableImpl.lambda$preprocessExpressions$0(TableImpl.java:605)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at 
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at 
org.apache.flink.table.api.internal.TableImpl.preprocessExpressions(TableImpl.java:606)
at 
org.apache.flink.table.api.internal.TableImpl.access$300(TableImpl.java:66)
at 
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:775)
at 
org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:770)
{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27760) NPE is thrown when executing PyFlink jobs in batch mode

2022-05-24 Thread Dian Fu (Jira)
Dian Fu created FLINK-27760:
---

 Summary: NPE is thrown when executing PyFlink jobs in batch mode
 Key: FLINK-27760
 URL: https://issues.apache.org/jira/browse/FLINK-27760
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.13.0
Reporter: Dian Fu
Assignee: Dian Fu


This is the exception stack reported by one user:
{code}
022-05-25 11:39:32,792 [MultipleInput(readOrder=[2,1,0], 
members=[\nHashJoin(joinType=[I... -> PythonCal ... with job vertex id 
71d9b8e1b249eaa7e67ef93fb483177f (63/100)#123] WARN 
org.apache.flink.runtime.taskmanager.Task [] - MultipleInput(readOrder=[2,1,0], 
members=[\nHashJoin(joinType=[I... -> PythonCal ... with job vertex id 
71d9b8e1b249eaa7e67ef93fb483177f (63/100)#123 
(6fa78755ff19ac9d0d57aba21840e834) switched from INITIALIZING to FAILED with 
failure cause: java.lang.NullPointerException
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getKeyedStateBackend(AbstractStreamOperator.java:466)
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processElementsOfCurrentKeyIfNeeded(AbstractPythonFunctionOperator.java:238)
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:208)
at 
org.apache.flink.table.runtime.operators.multipleinput.output.OneInputStreamOperatorOutput.emitWatermark(OneInputStreamOperatorOutput.java:45)
at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:39)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:632)
at 
org.apache.flink.table.runtime.operators.TableStreamOperator.processWatermark(TableStreamOperator.java:74)
at 
org.apache.flink.table.runtime.operators.multipleinput.output.OneInputStreamOperatorOutput.emitWatermark(OneInputStreamOperatorOutput.java:45)
at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:39)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:632)
at 
org.apache.flink.table.runtime.operators.TableStreamOperator.processWatermark(TableStreamOperator.java:74)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark2(AbstractStreamOperator.java:649)
at 
org.apache.flink.table.runtime.operators.multipleinput.input.SecondInputOfTwoInput.processWatermark(SecondInputOfTwoInput.java:44)
at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessorFactory$StreamTaskNetworkOutput.emitWatermark(StreamMultipleInputProcessorFactory.java:318)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:137)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:106)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at 
org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:87)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:424)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:569)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:651)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:541)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:586)
at java.lang.Thread.run(Thread.java:877)
{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27729) Support constructing StartCursor and StopCursor from MessageId

2022-05-21 Thread Dian Fu (Jira)
Dian Fu created FLINK-27729:
---

 Summary: Support constructing StartCursor and StopCursor from 
MessageId 
 Key: FLINK-27729
 URL: https://issues.apache.org/jira/browse/FLINK-27729
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Connectors / Pulsar
Reporter: Dian Fu
 Fix For: 1.16.0


Currently, StartCursor.fromMessageId and StopCursor.fromMessageId are still not 
supported in Python pulsar connectors. I think we could leverage the [pulsar 
Python library|https://pulsar.apache.org/api/python/#pulsar.MessageId] to 
implement these methods.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27598) Improve the exception message when mixing use Python UDF and Pandas UDF

2022-05-13 Thread Dian Fu (Jira)
Dian Fu created FLINK-27598:
---

 Summary: Improve the exception message when mixing use Python UDF 
and Pandas UDF
 Key: FLINK-27598
 URL: https://issues.apache.org/jira/browse/FLINK-27598
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Dian Fu


For the following job:
{code}

import argparse
from decimal import Decimal

from pyflink.common import Row
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import AggregateFunction, udaf


class DeduplicatedSum(AggregateFunction):
def create_accumulator(self):
return \{int(0), float(0)}

def get_value(self, accumulator) -> float:
sum(accumulator.values())

def accumulate(self, accumulator, k: int, v: float):
if k not in accumulator:
accumulator[k] = v

def retract(self, accumulator, k: int, v: float):
if k in accumulator:
del accumulator[k]


deduplicated_sum = udaf(f=DeduplicatedSum(),
func_type="pandas",
result_type=DataTypes.DOUBLE(),
input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()])


class FirstValue(AggregateFunction):
def create_accumulator(self):
return [int(-1), float(0)]

def get_value(self, accumulator) -> float:
return accumulator[1]

def accumulate(self, accumulator, k: int, v: float):
ck = accumulator[0]
if ck > k:
accumulator[0] = k
accumulator[1] = v


first_value = udaf(f=FirstValue(),
result_type=DataTypes.DOUBLE(),
func_type="pandas",
input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()])


class LastValue(AggregateFunction):
def create_accumulator(self):
return [int(-1), float(0)]

def get_value(self, accumulator: Row) -> float:
return accumulator[1]

def accumulate(self, accumulator: Row, k: int, v: float):
ck = accumulator[0]
if ck < k:
accumulator[0] = k
accumulator[1] = v


last_value = udaf(f=LastValue(),
func_type="pandas",
result_type=DataTypes.DOUBLE(),
input_types=[DataTypes.BIGINT(), DataTypes.DOUBLE()])


def create_source_table_trades(table_env):
source = f"""
CREATE TABLE src_trade (
`id` VARCHAR
,`timestamp` BIGINT
,`side` VARCHAR
,`price` DOUBLE
,`size` DOUBLE
,`uniqueId` BIGINT
,ts_micro AS `timestamp`
,ts_milli AS `timestamp` / 1000
,ts AS TO_TIMESTAMP_LTZ(`timestamp` / 1000, 3)
,WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
) WITH (
'connector' = 'datagen')
"""
table_env.execute_sql(source)


def create_sink_table(table_env):
sink = f"""
CREATE TABLE dst_kline (
wst TIMESTAMP_LTZ(3)
,wet TIMESTAMP_LTZ(3)
,otm BIGINT
,ot TIMESTAMP_LTZ(3)
,ctm BIGINT
,ct TIMESTAMP_LTZ(3)
,ptm BIGINT
,pt TIMESTAMP_LTZ(3)
,`open` DOUBLE
,`close` DOUBLE
,`high` DOUBLE
,`low` DOUBLE
,`vol` DOUBLE -- total trade volume
,`to` DOUBLE -- total turnover value
,`rev` INT -- revision, something we might use for versioning
,`gap` INT -- if this value is reliable
,PRIMARY KEY(wst) NOT ENFORCED
) WITH (
'connector' = 'print'
)
"""
table_env.execute_sql(sink)


def kafka_src_topic(value):
if not len(value.split('-')) == 5:
raise argparse.ArgumentTypeError("{} is not a valid kafka topic".format(value))
return value


def interval(value):
i = []
prev_num = []
for character in value:
if character.isalpha():
if prev_num:
num = Decimal(''.join(prev_num))
if character == 'd':
i.append(f"'\{num}' DAYS")
elif character == 'h':
i.append(f"'\{num}' HOURS")
elif character == 'm':
i.append(f"'\{num}' MINUTES")
elif character == 's':
i.append(f"'\{num}' SECONDS")
prev_num = []
elif character.isnumeric() or character == '.':
prev_num.append(character)
return " ".join(i)


def fetch_arguments_flink_kline():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--bootstrap-servers', type=str, required=True)
parser.add_argument('--src-topic', type=kafka_src_topic)
parser.add_argument('--consume-mode', type=str, default='group-offsets',
choices=['group-offsets', 'latest-offset'],
help='scan.startup.mode for kafka')
parser.add_argument('--interval', type=str, default='20s',
help='output interval e.g. 5d4h3m1s, default to 20s')
parser.add_argument('--force-test', action='store_true')
parser.add_argument('--consumer-group-hint', type=str, default='1')
args = parser.parse_args()
if args.force_test and args.consumer_group_hint == '1':
parser.error("With --force-test, should not use default '1' for 
--consumer-group-hint")
return args


def main():
# args = fetch_arguments_flink_kline()
# parts = args.src_topic.split('-')
# _, e, p, s, _ = parts
# dst_topic = f'\{e}-\{p}-\{s}-Kline\{args.interval}'

env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
# 
table_env.get_config().get_configuration().set_boolean("table.exec.emit.early-fire.enabled",
 True)
# 
table_env.get_config().get_configuration().set_string("table.exec.emit.early-fire.delay",
 "0 s")
table_env.get_config().get_configuration().set_string("table.exec.emit.allow-lateness",
 "1 h")
# 

[jira] [Created] (FLINK-27584) Support broadcast state in PyFlink DataStream API

2022-05-12 Thread Dian Fu (Jira)
Dian Fu created FLINK-27584:
---

 Summary: Support broadcast state in PyFlink DataStream API
 Key: FLINK-27584
 URL: https://issues.apache.org/jira/browse/FLINK-27584
 Project: Flink
  Issue Type: New Feature
  Components: API / Python
Reporter: Dian Fu
Assignee: Juntao Hu
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27564) Split PyFlink DataStream connectors into separate files

2022-05-10 Thread Dian Fu (Jira)
Dian Fu created FLINK-27564:
---

 Summary: Split PyFlink DataStream connectors into separate files
 Key: FLINK-27564
 URL: https://issues.apache.org/jira/browse/FLINK-27564
 Project: Flink
  Issue Type: Technical Debt
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.16.0


Currently all the connectors are located in 
[connectors.py|https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py].
 As more and more connectors are added, it makes this file more and more big. 
Besides, it's confusing for users as it's not clear for users which classes 
belonging to which connectors. It would be great to split different connectors 
into different files, e.g. pyflink/datastream/connectors/jdbc.py, 
pyflink/datastream/connectors/file_system.py, etc.

However, it should be taken into account to keep backward compatibility when 
performing this refactor.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27545) Update examples in PyFlink shell

2022-05-08 Thread Dian Fu (Jira)
Dian Fu created FLINK-27545:
---

 Summary: Update examples in PyFlink shell
 Key: FLINK-27545
 URL: https://issues.apache.org/jira/browse/FLINK-27545
 Project: Flink
  Issue Type: Bug
  Components: API / Python, Examples
Affects Versions: 1.15.0, 1.14.0
Reporter: Dian Fu
Assignee: Dian Fu


The examples in pyflink.shell is outdated and we should update it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27421) Bundle test utility classes into the PyFlink package to make users write test cases easily

2022-04-26 Thread Dian Fu (Jira)
Dian Fu created FLINK-27421:
---

 Summary: Bundle test utility classes into the PyFlink package to 
make users write test cases easily
 Key: FLINK-27421
 URL: https://issues.apache.org/jira/browse/FLINK-27421
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.16.0


Currently, the test utility classes are not bundled in the PyFlink package. 
This doesn't affect the functionalities. However, when users are trying out 
PyFlink and developing jobs with PyFlink, they may want to write some unit 
tests to ensure the functionalities work as expected. There are already some 
test utility classes in PyFlink, bundling them in the PyFlink package could 
help users a lot and allow they write unit tests more easier.

See https://lists.apache.org/thread/9z468o1hmg4bm7b2vz2o3lkmoqhxnxg1 for more 
details.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27373) Disable installing PyFlink using `python setup.py install`

2022-04-24 Thread Dian Fu (Jira)
Dian Fu created FLINK-27373:
---

 Summary: Disable installing PyFlink using `python setup.py install`
 Key: FLINK-27373
 URL: https://issues.apache.org/jira/browse/FLINK-27373
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu


When users install PyFlink using `pip install .` or `pip install apache-flink`, 
it will be installed into site-packages with directories as following:
{code}
apache_flink-1.14.4.dist-info
apache_flink_libraries-1.14.4.dist-info
pyflink
{code}

However, if installed using `python setup.py install`, the result directories 
will be as following:
{code}
apache_flink-1.14.4-py3.8-macosx-10.9-x86_64.egg
apache_flink_libraries-1.14.4-py3.8.egg
{code} 

The consequence of the latter case is that the installed package is 
problematic. The reason is that it contains jar packages in 
apache_flink_libraries and currently these jar packages need to be installed 
together with apache_flink to make them work(under directory pyflink/lib and 
pyflink/opt).

Users will get the following error when installing PyFlink via "python setup.py 
install":
{code}
Error: Could not find or load main class 
org.apache.flink.client.python.PythonGatewayServer
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.client.python.PythonGatewayServer
E/Users/dianfu/miniconda3/lib/python3.8/unittest/case.py:704: ResourceWarning: 
unclosed file <_io.BufferedWriter name=5>
  outcome.errors.clear()
ResourceWarning: Enable tracemalloc to get the object allocation traceback

==
ERROR: test_scalar_function (test_table_api.TableTests)
--
Traceback (most recent call last):
  File "/Users/dianfu/code/src/github/pyflink-faq/testing/test_utils.py", line 
122, in setUp
self.t_env = 
TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
  File 
"/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-10.9-x86_64.egg/pyflink/table/environment_settings.py",
 line 267, in in_streaming_mode
get_gateway().jvm.EnvironmentSettings.inStreamingMode())
  File 
"/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-10.9-x86_64.egg/pyflink/java_gateway.py",
 line 62, in get_gateway
_gateway = launch_gateway()
  File 
"/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-10.9-x86_64.egg/pyflink/java_gateway.py",
 line 112, in launch_gateway
raise Exception("Java gateway process exited before sending its port 
number")
Exception: Java gateway process exited before sending its port number
{code}

`python setup.py install` is already deprecated in Python community and it's 
suggesting to use `pip install`. We have two choices:
- Support installing PyFlink using `python setup.py install`
- Disable installing PyFlink using `python setup.py install` and throw a 
meaningful exception if users install it with `python setup.py install` to make 
it more explicit




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27223) State access doesn't work as expected when cache size is set to 0

2022-04-13 Thread Dian Fu (Jira)
Dian Fu created FLINK-27223:
---

 Summary: State access doesn't work as expected when cache size is 
set to 0
 Key: FLINK-27223
 URL: https://issues.apache.org/jira/browse/FLINK-27223
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.0
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.15.1


For the following job:
{code}
import json
import logging
import sys

from pyflink.common import Types, Configuration
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.util.java_utils import get_j_env_configuration

if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
format="%(message)s")

env = StreamExecutionEnvironment.get_execution_environment()
config = Configuration(

j_configuration=get_j_env_configuration(env._j_stream_execution_environment))
config.set_integer("python.state.cache-size", 0)
env.set_parallelism(1)

# define the source
ds = env.from_collection(
collection=[
(1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", 
"city": "Berlin"}}'),
(2, '{"name": "hello", "tel": 135, "addr": {"country": "China", 
"city": "Shanghai"}}'),
(3, '{"name": "world", "tel": 124, "addr": {"country": "USA", 
"city": "NewYork"}}'),
(4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", 
"city": "Hangzhou"}}')
],
type_info=Types.ROW_NAMED(["id", "info"], [Types.INT(), Types.STRING()])
)

# key by
ds = ds.map(lambda data: (json.loads(data.info)['addr']['country'],
  json.loads(data.info)['tel'])) \
   .key_by(lambda data: data[0]).sum(1)
ds.print()
env.execute()
{code}

The expected result should be:
{code}
('Germany', 123)
('China', 135)
('USA', 124)
('China', 167)
{code}

However, the actual result is:
{code}
('Germany', 123)
('China', 135)
('USA', 124)
('China', 32)
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27126) Respect the state cache size configurations in Python DataStream API operators

2022-04-07 Thread Dian Fu (Jira)
Dian Fu created FLINK-27126:
---

 Summary: Respect the state cache size configurations in Python 
DataStream API operators
 Key: FLINK-27126
 URL: https://issues.apache.org/jira/browse/FLINK-27126
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu


Currently, the state cache size configurations are not handled in Python 
DataStream API operators. See 
https://github.com/apache/flink/blob/master/flink-python/pyflink/fn_execution/beam/beam_operations.py#L188
 for more details.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27108) State cache clean up doesn't work as expected

2022-04-07 Thread Dian Fu (Jira)
Dian Fu created FLINK-27108:
---

 Summary: State cache clean up doesn't work as expected
 Key: FLINK-27108
 URL: https://issues.apache.org/jira/browse/FLINK-27108
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.14.0, 1.13.0, 1.15.0
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.15.0, 1.13.7, 1.14.5


The test case test_session_window_late_merge failed when working on 
FLINK-26190. After digging into this problem, the reason should be that the 
logic to determine [whether a key & namespace exists in state cache is 
wrong|https://github.com/apache/flink/blob/master/flink-python/pyflink/fn_execution/state_impl.py#L1183]
 is wrong. It causes the state cache isn't clean up when it becomes invalidate. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-27067) Prevent usage of deprecated APIs in PyFlink examples

2022-04-05 Thread Dian Fu (Jira)
Dian Fu created FLINK-27067:
---

 Summary: Prevent usage of deprecated APIs in PyFlink examples
 Key: FLINK-27067
 URL: https://issues.apache.org/jira/browse/FLINK-27067
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Examples
Reporter: Dian Fu


It has prevented usage of deprecated APIs in Java/Scala examples in 
FLINK-24833. It would be great to also perform this enforcement for Python 
examples.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26986) Remove deprecated string expressions in Python Table API

2022-04-01 Thread Dian Fu (Jira)
Dian Fu created FLINK-26986:
---

 Summary: Remove deprecated string expressions in Python Table API
 Key: FLINK-26986
 URL: https://issues.apache.org/jira/browse/FLINK-26986
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.0
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.15.0


In FLINK-26704, it has removed the string expressions in Table API. However, 
there are still some APIs still using string expressions in Python Table API, 
however, they should not work any more as the string expressions have already 
been removed in the Java Table API. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26920) It reported "The configured managed memory fraction for Python worker process must be within (0, 1], was: %s."

2022-03-30 Thread Dian Fu (Jira)
Dian Fu created FLINK-26920:
---

 Summary: It reported "The configured managed memory fraction for 
Python worker process must be within (0, 1], was: %s."
 Key: FLINK-26920
 URL: https://issues.apache.org/jira/browse/FLINK-26920
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.14.0, 1.13.0, 1.12.0, 1.15.0
Reporter: Dian Fu


For the following code:
{code}
import numpy as np
from pyflink.common import Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.table import StreamTableEnvironment
from sklearn import svm, datasets

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)

# Table Source
t_env.execute_sql("""
CREATE TABLE my_source (
a FLOAT,
key STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.a.min' = '4.3',
'fields.a.max' = '7.9',
'fields.key.length' = '10'
)
""")


def process_type():
return Types.ROW_NAMED(
["a", "key"],
[Types.FLOAT(), Types.STRING()]
)


# append only datastream
ds = t_env.to_append_stream(
t_env.from_path('my_source'),
process_type())


class MyKeyedProcessFunction(KeyedProcessFunction):

def open(self, runtime_context: RuntimeContext):
clf = svm.SVC()
X, y= datasets.load_iris(return_X_y=True)
clf.fit(X, y)

self.model = clf


def process_element(self, value: Row, ctx: 'KeyedProcessFunction.Context'):

# 根据role_id + space去redis查询回合结算日志

features = np.array([value['a'], 3.5, 1.4, 0.2]).reshape(1, -1)
predict = int(self.model.predict(features)[0])

yield Row(predict=predict, role_id=value['key'])



ds = ds.key_by(lambda a: a['key'], key_type=Types.STRING()) \
.process(
MyKeyedProcessFunction(), 
output_type=Types.ROW_NAMED(
["hit", "role_id"],
[Types.INT(), Types.STRING()]
))


# 采用table sink
t_env.execute_sql("""
CREATE TABLE my_sink (
  hit INT,
  role_id STRING
) WITH (
  'connector' = 'print'
)
""")

t_env.create_temporary_view("predict", ds)
t_env.execute_sql("""
INSERT INTO my_sink
SELECT * FROM predict
""").wait()
{code}

It reported the following exception:
{code}
Caused by: java.lang.IllegalArgumentException: The configured managed memory 
fraction for Python worker process must be within (0, 1], was: %s. It may be 
because the consumer type "Python" was missing or set to 0 for the config 
option "taskmanager.memory.managed.consumer-weights".0.0
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:233)
at 
org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:56)
at 
org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator.open(AbstractOneInputPythonFunctionOperator.java:116)
at 
org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.open(PythonKeyedProcessOperator.java:121)
at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:712)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:688)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:655)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26919) Table API program compiles field with "Cannot resolve field [a], input field list:[EXPR$0]"

2022-03-29 Thread Dian Fu (Jira)
Dian Fu created FLINK-26919:
---

 Summary: Table API program compiles field with "Cannot resolve 
field [a], input field list:[EXPR$0]"
 Key: FLINK-26919
 URL: https://issues.apache.org/jira/browse/FLINK-26919
 Project: Flink
  Issue Type: Bug
  Components: API / Python, Table SQL / Planner
Affects Versions: 1.14.0
Reporter: Dian Fu


For the following job:
{code}
import datetime as dt

from pyflink.table import *
from pyflink.table.window import Tumble
from pyflink.table.expressions import col, lit, UNBOUNDED_RANGE, CURRENT_RANGE


def make_table():
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

source_data_path = '///path/to/source/directory/test.csv'
sink_data_path = '///path/to/sink/directory/'
source_ddl = f"""
create table Orders(
a VARCHAR,
b BIGINT,
c BIGINT,
rowtime TIMESTAMP(3),
WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '{source_data_path}'
)
"""

table_env.execute_sql(source_ddl)

sink_ddl = f"""
create table `Result`(
a VARCHAR,
cnt BIGINT
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '{sink_data_path}'
)
"""
table_env.execute_sql(sink_ddl)

orders = table_env.from_path('Orders')
orders.order_by(orders.a).select(orders.a, 
orders.b.count).execute_insert("Result").wait()


if __name__ == '__main__':
make_table()
{code}

It compiles failed with the following exception:
{code}
org.apache.flink.table.api.ValidationException: Cannot resolve field [a], input 
field list:[EXPR$0].
at 
org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.failForField(ReferenceResolverRule.java:93)
at 
org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.lambda$null$3(ReferenceResolverRule.java:87)
at java.util.Optional.orElseThrow(Optional.java:290)
at 
org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.lambda$null$4(ReferenceResolverRule.java:85)
at java.util.Optional.orElseGet(Optional.java:267)
at 
org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.lambda$visit$5(ReferenceResolverRule.java:79)
at java.util.Optional.orElseGet(Optional.java:267)
at 
org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.visit(ReferenceResolverRule.java:73)
at 
org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule$ExpressionResolverVisitor.visit(ReferenceResolverRule.java:51)
at 
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:29)
at 
org.apache.flink.table.expressions.UnresolvedReferenceExpression.accept(UnresolvedReferenceExpression.java:59)
at 
org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule.lambda$apply$0(ReferenceResolverRule.java:47)
at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at 
org.apache.flink.table.expressions.resolver.rules.ReferenceResolverRule.apply(ReferenceResolverRule.java:48)
at 
org.apache.flink.table.expressions.resolver.ExpressionResolver.lambda$null$2(ExpressionResolver.java:241)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at java.util.function.Function.lambda$andThen$1(Function.java:88)
at 
org.apache.flink.table.expressions.resolver.ExpressionResolver.resolve(ExpressionResolver.java:204)
at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.projectInternal(OperationTreeBuilder.java:194)
at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:169)
at 
org.apache.flink.table.api.internal.TableImpl.select(TableImpl.java:138)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

[jira] [Created] (FLINK-26855) ImportError: cannot import name 'environmentfilter' from 'jinja2'

2022-03-24 Thread Dian Fu (Jira)
Dian Fu created FLINK-26855:
---

 Summary: ImportError: cannot import name 'environmentfilter' from 
'jinja2'
 Key: FLINK-26855
 URL: https://issues.apache.org/jira/browse/FLINK-26855
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.0, 1.16.0
Reporter: Dian Fu


{code}
ar 24 17:38:39 ===mypy checks... [SUCCESS]===
Mar 24 17:38:39 rm -rf _build/*
Mar 24 17:38:39 /__w/2/s/flink-python/dev/.conda/bin/sphinx-build -b html -d 
_build/doctrees  -a -W . _build/html
Mar 24 17:38:40 Traceback (most recent call last):
Mar 24 17:38:40   File "/__w/2/s/flink-python/dev/.conda/bin/sphinx-build", 
line 6, in 
Mar 24 17:38:40 from sphinx.cmd.build import main
Mar 24 17:38:40   File 
"/__w/2/s/flink-python/dev/.conda/lib/python3.7/site-packages/sphinx/cmd/build.py",
 line 23, in 
Mar 24 17:38:40 from sphinx.application import Sphinx
Mar 24 17:38:40   File 
"/__w/2/s/flink-python/dev/.conda/lib/python3.7/site-packages/sphinx/application.py",
 line 42, in 
Mar 24 17:38:40 from sphinx.highlighting import lexer_classes, lexers
Mar 24 17:38:40   File 
"/__w/2/s/flink-python/dev/.conda/lib/python3.7/site-packages/sphinx/highlighting.py",
 line 30, in 
Mar 24 17:38:40 from sphinx.ext import doctest
Mar 24 17:38:40   File 
"/__w/2/s/flink-python/dev/.conda/lib/python3.7/site-packages/sphinx/ext/doctest.py",
 line 28, in 
Mar 24 17:38:40 from sphinx.builders import Builder
Mar 24 17:38:40   File 
"/__w/2/s/flink-python/dev/.conda/lib/python3.7/site-packages/sphinx/builders/__init__.py",
 line 24, in 
Mar 24 17:38:40 from sphinx.io import read_doc
Mar 24 17:38:40   File 
"/__w/2/s/flink-python/dev/.conda/lib/python3.7/site-packages/sphinx/io.py", 
line 42, in 
Mar 24 17:38:40 from sphinx.util.rst import append_epilog, docinfo_re, 
prepend_prolog
Mar 24 17:38:40   File 
"/__w/2/s/flink-python/dev/.conda/lib/python3.7/site-packages/sphinx/util/rst.py",
 line 22, in 
Mar 24 17:38:40 from jinja2 import environmentfilter
Mar 24 17:38:40 ImportError: cannot import name 'environmentfilter' from 
'jinja2' 
(/__w/2/s/flink-python/dev/.conda/lib/python3.7/site-packages/jinja2/__init__.py)
Mar 24 17:38:40 Makefile:76: recipe for target 'html' failed
Mar 24 17:38:40 make: *** [html] Error 1
Mar 24 17:38:40 ==sphinx checks... [FAILED]===
{code}

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=33717=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=c67e71ed-6451-5d26-8920-5a8cf9651901=23450



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26847) Unify the logic for the command line option -py and -pym

2022-03-24 Thread Dian Fu (Jira)
Dian Fu created FLINK-26847:
---

 Summary: Unify the logic for the command line option -py and -pym
 Key: FLINK-26847
 URL: https://issues.apache.org/jira/browse/FLINK-26847
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26846) Gauge metrics doesn't work in PyFlink

2022-03-24 Thread Dian Fu (Jira)
Dian Fu created FLINK-26846:
---

 Summary: Gauge metrics doesn't work in PyFlink
 Key: FLINK-26846
 URL: https://issues.apache.org/jira/browse/FLINK-26846
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.12.0
Reporter: Dian Fu
Assignee: Dian Fu


See https://lists.apache.org/thread/w7jkwgpon6qy4p6k1nhhw5k4m81r8c8p for more 
details.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26506) Support StreamExecutionEnvironment.registerCachedFile in Python DataStream API

2022-03-06 Thread Dian Fu (Jira)
Dian Fu created FLINK-26506:
---

 Summary: Support StreamExecutionEnvironment.registerCachedFile in 
Python DataStream API
 Key: FLINK-26506
 URL: https://issues.apache.org/jira/browse/FLINK-26506
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.16.0


This API is missed in Python DataStream API.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26483) Support WindowedStream.aggregate in Python DataStream API

2022-03-03 Thread Dian Fu (Jira)
Dian Fu created FLINK-26483:
---

 Summary: Support WindowedStream.aggregate in Python DataStream API
 Key: FLINK-26483
 URL: https://issues.apache.org/jira/browse/FLINK-26483
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26482) Support WindowedStream.reduce in Python DataStream API

2022-03-03 Thread Dian Fu (Jira)
Dian Fu created FLINK-26482:
---

 Summary: Support WindowedStream.reduce in Python DataStream API
 Key: FLINK-26482
 URL: https://issues.apache.org/jira/browse/FLINK-26482
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26481) Support side output for windowing in Python DataStream API

2022-03-03 Thread Dian Fu (Jira)
Dian Fu created FLINK-26481:
---

 Summary: Support side output for windowing in Python DataStream API
 Key: FLINK-26481
 URL: https://issues.apache.org/jira/browse/FLINK-26481
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26480) Support window_all in Python DataStream API

2022-03-03 Thread Dian Fu (Jira)
Dian Fu created FLINK-26480:
---

 Summary: Support window_all in Python DataStream API
 Key: FLINK-26480
 URL: https://issues.apache.org/jira/browse/FLINK-26480
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26479) Add time_window and count_window in KeyedStream in Python DataStream API

2022-03-03 Thread Dian Fu (Jira)
Dian Fu created FLINK-26479:
---

 Summary: Add time_window and count_window in KeyedStream in Python 
DataStream API
 Key: FLINK-26479
 URL: https://issues.apache.org/jira/browse/FLINK-26479
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.16.0


This is to align with the Java API and provide a few handy methods to use.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26478) Support evictor in Python DataStream API

2022-03-03 Thread Dian Fu (Jira)
Dian Fu created FLINK-26478:
---

 Summary: Support evictor in Python DataStream API
 Key: FLINK-26478
 URL: https://issues.apache.org/jira/browse/FLINK-26478
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu


Currently, evictor is still not supported in Python DataStream API.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-26477) Finalize the window support in Python DataStream API

2022-03-03 Thread Dian Fu (Jira)
Dian Fu created FLINK-26477:
---

 Summary: Finalize the window support in Python DataStream API
 Key: FLINK-26477
 URL: https://issues.apache.org/jira/browse/FLINK-26477
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.16.0


It has provided window support in Python DataStream API in FLINK-21842. 
However, there are still a few functionalities missing, e.g. evictor, etc. This 
is an umbrella JIRA to collect all the missing part of work.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25676) Add set_description in Python API

2022-01-17 Thread Dian Fu (Jira)
Dian Fu created FLINK-25676:
---

 Summary: Add set_description in Python API
 Key: FLINK-25676
 URL: https://issues.apache.org/jira/browse/FLINK-25676
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.15.0


This is to align with the setDescription method introduced in FLINK-25072.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25231) Update PyFlink to use the new type system

2021-12-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-25231:
---

 Summary: Update PyFlink to use the new type system
 Key: FLINK-25231
 URL: https://issues.apache.org/jira/browse/FLINK-25231
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.15.0


Currently, there are a lot of places in PyFlink Table API still using the 
legacy type system. We need to revisit this and migrate them to the new type 
system(DataType/LogicalType).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25216) test_sliding_group_window_over_proctime failed with "IndexError: list index out of range"

2021-12-07 Thread Dian Fu (Jira)
Dian Fu created FLINK-25216:
---

 Summary: test_sliding_group_window_over_proctime failed with 
"IndexError: list index out of range"
 Key: FLINK-25216
 URL: https://issues.apache.org/jira/browse/FLINK-25216
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.15.0
Reporter: Dian Fu


{code}
self = 
Dec 06 03:10:43 
Dec 06 03:10:43 def test_sliding_group_window_over_proctime(self):
Dec 06 03:10:43 
self.t_env.get_config().get_configuration().set_string("parallelism.default", 
"1")
Dec 06 03:10:43 from pyflink.table.window import Slide
Dec 06 03:10:43 self.t_env.register_function("mean_udaf", mean_udaf)
Dec 06 03:10:43 
Dec 06 03:10:43 source_table = """
Dec 06 03:10:43 create table source_table(
Dec 06 03:10:43 a INT,
Dec 06 03:10:43 proctime as PROCTIME()
Dec 06 03:10:43 ) with(
Dec 06 03:10:43 'connector' = 'datagen',
Dec 06 03:10:43 'rows-per-second' = '1',
Dec 06 03:10:43 'fields.a.kind' = 'sequence',
Dec 06 03:10:43 'fields.a.start' = '1',
Dec 06 03:10:43 'fields.a.end' = '10'
Dec 06 03:10:43 )
Dec 06 03:10:43 """
Dec 06 03:10:43 self.t_env.execute_sql(source_table)
Dec 06 03:10:43 t = self.t_env.from_path("source_table")
Dec 06 03:10:43 iterator = t.select("a, proctime") \
Dec 06 03:10:43 
.window(Slide.over("1.seconds").every("1.seconds").on("proctime").alias("w")) \
Dec 06 03:10:43 .group_by("a, w") \
Dec 06 03:10:43 .select("mean_udaf(a) as b, 
w.start").execute().collect()
Dec 06 03:10:43 result = [i for i in iterator]
Dec 06 03:10:43 # if the WindowAssigner.isEventTime() does not return 
false,
Dec 06 03:10:43 # the w.start would be 1970-01-01
Dec 06 03:10:43 # TODO: After fixing the TimeZone problem of window 
with processing time (will be fixed in
Dec 06 03:10:43 # FLIP-162), we should replace it with a more accurate 
assertion.
Dec 06 03:10:43 >   self.assertTrue(result[0][1].year > 1970)
Dec 06 03:10:43 E   IndexError: list index out of range
{code}

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=27569=logs=3e4dd1a2-fe2f-5e5d-a581-48087e718d53=b4612f28-e3b5-5853-8a8b-610ae894217a=21780



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24962) The root cause of a failed job was hidden in certain cases

2021-11-19 Thread Dian Fu (Jira)
Dian Fu created FLINK-24962:
---

 Summary: The root cause of a failed job was hidden in certain cases
 Key: FLINK-24962
 URL: https://issues.apache.org/jira/browse/FLINK-24962
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.14.0
Reporter: Dian Fu
 Fix For: 1.14.1


For the following job:
{code}

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, DataStream
from pyflink.table import StreamTableEnvironment


def state_access_demo():
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
    sql = """CREATE TABLE kafka_source (
        id int,
        name string
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'test',
        'properties.bootstrap.servers' = '***',
        'properties.group.id' = '***',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'json',
        'json.fail-on-missing-field' = 'false',
        'json.ignore-parse-errors' = 'true'
    )"""
    t_env.execute_sql(sql)
    table = t_env.from_path("kafka_source")
    ds: DataStream = t_env.to_append_stream(table, 
type_info=Types.ROW([Types.INT(), Types.STRING()]))
    ds.map(lambda a: print(a))
    env.execute('state_access_demo')

if __name__ == '__main__':
    state_access_demo()
{code}

This job failed with the following exception which doesn't contain any useful 
information (the root cause is that it should return a value in the map 
function):
{code}

Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! 
INFO:root:Initializing Python harness: 
D:\soft\anaconda\lib\site-packages\pyflink\fn_execution\beam\beam_boot.py 
--id=2-1 --provision_endpoint=localhost:57201
INFO:root:Starting up Python harness in loopback mode.

    at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:566)
    at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:255)
    at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:131)
    at 
org.apache.flink.streaming.api.operators.python.AbstractOneInputPythonFunctionOperator.open(AbstractOneInputPythonFunctionOperator.java:116)
    at 
org.apache.flink.streaming.api.operators.python.PythonProcessOperator.open(PythonProcessOperator.java:59)
    at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalStateException: Process died with exit code 0
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
    at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
    at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:451)
    at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:436)
    at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
    at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:564)
    ... 14 more
Caused by: java.lang.IllegalStateException: Process died with exit code 0
    at 

[jira] [Created] (FLINK-24662) PyFlink sphinx check failed with "node class 'meta' is already registered, its visitors will be overridden"

2021-10-26 Thread Dian Fu (Jira)
Dian Fu created FLINK-24662:
---

 Summary: PyFlink sphinx check failed with "node class 'meta' is 
already registered, its visitors will be overridden"
 Key: FLINK-24662
 URL: https://issues.apache.org/jira/browse/FLINK-24662
 Project: Flink
  Issue Type: Bug
  Components: API / Python, Tests
Affects Versions: 1.14.0, 1.13.0, 1.15.0
Reporter: Dian Fu
Assignee: Dian Fu


[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=25481=logs=9cada3cb-c1d3-5621-16da-0f718fb86602=8d78fe4f-d658-5c70-12f8-4921589024c3]

{code}
==mypy checks... [SUCCESS]===
Oct 26 22:08:34 rm -rf _build/*
Oct 26 22:08:34 /__w/1/s/flink-python/dev/.conda/bin/sphinx-build -b html -d 
_build/doctrees  -a -W . _build/html
Oct 26 22:08:34 Running Sphinx v2.4.4
Oct 26 22:08:34 
Oct 26 22:08:34 Warning, treated as error:
Oct 26 22:08:34 node class 'meta' is already registered, its visitors will be 
overridden
Oct 26 22:08:34 Makefile:76: recipe for target 'html' failed
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24287) Bump virtualenv to the latest version

2021-09-14 Thread Dian Fu (Jira)
Dian Fu created FLINK-24287:
---

 Summary: Bump virtualenv to the latest version
 Key: FLINK-24287
 URL: https://issues.apache.org/jira/browse/FLINK-24287
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Tests
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.15.0


Currently, the virtualenv version (16.0.0) used in PyFlink tests is a little 
outdated(the latest version is ). The pip bundled in the old virtualenv is a 
little old. The consequence is that it will compile the grpcio library from 
source instead of using the wheel package during installing grpcio. It takes 
several minutes to compile grpcio and the time could be avoided after bump 
virtualenv.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24244) Add logging about whether it's executed in loopback mode

2021-09-10 Thread Dian Fu (Jira)
Dian Fu created FLINK-24244:
---

 Summary: Add logging about whether it's executed in loopback mode
 Key: FLINK-24244
 URL: https://issues.apache.org/jira/browse/FLINK-24244
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0


Currently, it's unclear whether a job is running in loopback mode or process 
mode, it would be great to add some logging to make it clear. This would be 
helpful for debugging. It makes it clear whether a failed test is running in 
loopback mode or process mode.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24243) Clean up the code and avoid warning messages introduced by deprecated API

2021-09-10 Thread Dian Fu (Jira)
Dian Fu created FLINK-24243:
---

 Summary: Clean up the code and avoid warning messages introduced 
by deprecated API
 Key: FLINK-24243
 URL: https://issues.apache.org/jira/browse/FLINK-24243
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu


Currently, there are quite a few warning messages when executing PyFlink jobs, 
e.g.
{code}
Process finished with exit code 0
/usr/local/Cellar/python@3.7/3.7.10_2/Frameworks/Python.framework/Versions/3.7/lib/python3.7/subprocess.py:883:
 ResourceWarning: subprocess 75115 is still running
  ResourceWarning, source=self)
ResourceWarning: Enable tracemalloc to get the object allocation traceback
/Users/dianfu/code/src/apache/flink/flink-python/pyflink/table/udf.py:326: 
DeprecationWarning: Using or importing the ABCs from 'collections' instead of 
from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop 
working
  if not isinstance(input_types, collections.Iterable) \
/Users/dianfu/code/src/apache/flink/flink-python/pyflink/table/table_environment.py:537:
 DeprecationWarning: Deprecated in 1.10. Use create_table instead.
  warnings.warn("Deprecated in 1.10. Use create_table instead.", 
DeprecationWarning)
/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/future/standard_library/__init__.py:65:
 DeprecationWarning: the imp module is deprecated in favour of importlib; see 
the module's documentation for alternative uses
  import imp
2021-09-10 15:03:47,335 - apache_beam.typehints.native_type_compatibility - 
INFO - Using Any for unsupported type: typing.Sequence[~T]
/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/state_impl.py:677:
 DeprecationWarning: Using or importing the ABCs from 'collections' instead of 
from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop 
working
  class RemovableConcatIterator(collections.Iterator):
/Users/dianfu/code/src/apache/flink/flink-python/pyflink/fn_execution/utils/operation_utils.py:19:
 DeprecationWarning: Using or importing the ABCs from 'collections' instead of 
from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop 
working
  from collections import Generator
{code}

We should clean up the code and avoid warning messages introduced by deprecated 
API by using latest API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24101) Support to generate PyDocs without Flink distribution

2021-09-01 Thread Dian Fu (Jira)
Dian Fu created FLINK-24101:
---

 Summary: Support to generate PyDocs without Flink distribution
 Key: FLINK-24101
 URL: https://issues.apache.org/jira/browse/FLINK-24101
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Documentation
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24097) Remove the streaming check in StreamTableEnvironment in PyFlink

2021-09-01 Thread Dian Fu (Jira)
Dian Fu created FLINK-24097:
---

 Summary: Remove the streaming check in StreamTableEnvironment in 
PyFlink
 Key: FLINK-24097
 URL: https://issues.apache.org/jira/browse/FLINK-24097
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.14.0
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0


Since it has supported to DataStream batch mode in StreamTableEnvironment in 
FLINK-20897, it should also work in PyFlink. Currently there are a few checks 
in the Python code and we should remove them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24083) The result isn't as expected when the result type is generator of string for Python UDTF

2021-08-31 Thread Dian Fu (Jira)
Dian Fu created FLINK-24083:
---

 Summary: The result isn't as expected when the result type is 
generator of string for Python UDTF
 Key: FLINK-24083
 URL: https://issues.apache.org/jira/browse/FLINK-24083
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.14.0
Reporter: Dian Fu
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24082) Python UDTF throws exception when the result type is generator of Row

2021-08-31 Thread Dian Fu (Jira)
Dian Fu created FLINK-24082:
---

 Summary: Python UDTF throws exception when the result type is 
generator of Row
 Key: FLINK-24082
 URL: https://issues.apache.org/jira/browse/FLINK-24082
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.13.0
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.13.3






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24062) Exception encountered during timer serialization in Python DataStream API

2021-08-30 Thread Dian Fu (Jira)
Dian Fu created FLINK-24062:
---

 Summary: Exception encountered during timer serialization in 
Python DataStream API 
 Key: FLINK-24062
 URL: https://issues.apache.org/jira/browse/FLINK-24062
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.14.0
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0


For the following example:
{code}

#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import logging
import sys

from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, 
OutputFileConfig,
   RollingPolicy)


word_count_data = ["To be, or not to be,--that is the question:--",
   "Whether 'tis nobler in the mind to suffer",
   "The slings and arrows of outrageous fortune",
   "Or to take arms against a sea of troubles,",
   "And by opposing end them?--To die,--to sleep,--",
   "No more; and by a sleep to say we end",
   "The heartache, and the thousand natural shocks",
   "That flesh is heir to,--'tis a consummation",
   "Devoutly to be wish'd. To die,--to sleep;--",
   "To sleep! perchance to dream:--ay, there's the rub;",
   "For in that sleep of death what dreams may come,",
   "When we have shuffled off this mortal coil,",
   "Must give us pause: there's the respect",
   "That makes calamity of so long life;",
   "For who would bear the whips and scorns of time,",
   "The oppressor's wrong, the proud man's contumely,",
   "The pangs of despis'd love, the law's delay,",
   "The insolence of office, and the spurns",
   "That patient merit of the unworthy takes,",
   "When he himself might his quietus make",
   "With a bare bodkin? who would these fardels bear,",
   "To grunt and sweat under a weary life,",
   "But that the dread of something after death,--",
   "The undiscover'd country, from whose bourn",
   "No traveller returns,--puzzles the will,",
   "And makes us rather bear those ills we have",
   "Than fly to others that we know not of?",
   "Thus conscience does make cowards of us all;",
   "And thus the native hue of resolution",
   "Is sicklied o'er with the pale cast of thought;",
   "And enterprises of great pith and moment,",
   "With this regard, their currents turn awry,",
   "And lose the name of action.--Soft you now!",
   "The fair Ophelia!--Nymph, in thy orisons",
   "Be all my sins remember'd."]


def word_count(input_path, output_path):
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
# write all the data to one file
env.set_parallelism(1)

# define the source
if input_path is not None:
ds = env.from_source(

source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
   input_path)
 .process_static_file_set().build(),
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="file_source"
)
else:
print("Executing word_count example with default input data set.")
print("Use --input to specify file input.")
ds = env.from_collection(word_count_data)

def split(line):
yield from line.split()


[jira] [Created] (FLINK-24049) TupleTypeInfo doesn't handle correctly for data types need conversion

2021-08-30 Thread Dian Fu (Jira)
Dian Fu created FLINK-24049:
---

 Summary: TupleTypeInfo doesn't handle correctly for data types 
need conversion
 Key: FLINK-24049
 URL: https://issues.apache.org/jira/browse/FLINK-24049
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.13.0, 1.12.0, 1.14.0
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0, 1.12.6, 1.13.3






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24003) Lookback mode doesn't work when mixing use of Python Table API and Python DataStream API

2021-08-26 Thread Dian Fu (Jira)
Dian Fu created FLINK-24003:
---

 Summary: Lookback mode doesn't work when mixing use of Python 
Table API and Python DataStream API
 Key: FLINK-24003
 URL: https://issues.apache.org/jira/browse/FLINK-24003
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.14.0
Reporter: Dian Fu
Assignee: Huang Xingbo
 Fix For: 1.14.0


For the following program:
{code}
import logging
import time

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction
from pyflink.table import StreamTableEnvironment, DataTypes, Schema


def test_chaining():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)

t_env.get_config().get_configuration().set_boolean("python.operator-chaining.enabled",
 False)

# 1. create source Table
t_env.execute_sql("""
CREATE TABLE datagen (
id INT,
data STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '1000'
)
""")

# 2. create sink Table
t_env.execute_sql("""
CREATE TABLE print (
id BIGINT,
data STRING,
flag STRING
) WITH (
'connector' = 'blackhole'
)
""")

t_env.execute_sql("""
CREATE TABLE print_2 (
id BIGINT,
data STRING,
flag STRING
) WITH (
'connector' = 'blackhole'
)
""")

# 3. query from source table and perform calculations
# create a Table from a Table API query:
source_table = t_env.from_path("datagen")

ds = t_env.to_append_stream(
source_table,
Types.ROW([Types.INT(), Types.STRING()]))

ds1 = ds.map(lambda i: (i[0] * i[0], i[1]))
ds2 = ds.map(lambda i: (i[0], i[1][2:]))

class MyCoMapFunction(CoMapFunction):

def map1(self, value):
print('hahah')
return value

def map2(self, value):
print('hahah')
return value

ds3 = ds1.connect(ds2).map(MyCoMapFunction(), 
output_type=Types.TUPLE([Types.LONG(), Types.STRING()]))

ds4 = ds3.map(lambda i: (i[0], i[1], "left"),
  output_type=Types.TUPLE([Types.LONG(), Types.STRING(), 
Types.STRING()]))

ds5 = ds3.map(lambda i: (i[0], i[1], "right"))\
 .map(lambda i: i,
  output_type=Types.TUPLE([Types.LONG(), Types.STRING(), 
Types.STRING()]))

schema = Schema.new_builder() \
.column("f0", DataTypes.BIGINT()) \
.column("f1", DataTypes.STRING()) \
.column("f2", DataTypes.STRING()) \
.build()

result_table_3 = t_env.from_data_stream(ds4, schema)
statement_set = t_env.create_statement_set()
statement_set.add_insert("print", result_table_3)

result_table_4 = t_env.from_data_stream(ds5, schema)
statement_set.add_insert("print_2", result_table_4)

statement_set.execute().wait()


if __name__ == "__main__":

start_ts = time.time()
test_chaining()
end_ts = time.time()
print("--- %s seconds ---" % (end_ts - start_ts))
{code}

Lookback mode doesn't work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23994) ArrayDataSerializer and MapDataSerializer doesn't handle correctly for Null values

2021-08-26 Thread Dian Fu (Jira)
Dian Fu created FLINK-23994:
---

 Summary: ArrayDataSerializer and MapDataSerializer doesn't handle 
correctly for Null values
 Key: FLINK-23994
 URL: https://issues.apache.org/jira/browse/FLINK-23994
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.13.0, 1.12.0, 1.11.0, 1.10.0, 1.14.0
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0, 1.12.6, 1.13.3






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23944) PulsarSourceITCase.testTaskManagerFailure is instable

2021-08-24 Thread Dian Fu (Jira)
Dian Fu created FLINK-23944:
---

 Summary: PulsarSourceITCase.testTaskManagerFailure is instable
 Key: FLINK-23944
 URL: https://issues.apache.org/jira/browse/FLINK-23944
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Pulsar
Affects Versions: 1.14.0
Reporter: Dian Fu


[https://dev.azure.com/dianfu/Flink/_build/results?buildId=430=logs=f3dc9b18-b77a-55c1-591e-264c46fe44d1=2d3cd81e-1c37-5c31-0ee4-f5d5cdb9324d]

It's from my personal azure pipeline, however, I'm pretty sure that I have not 
touched any code related to this. 


{code:java}
Aug 24 10:44:13 [ERROR] testTaskManagerFailure{TestEnvironment, 
ExternalContext, ClusterControllable}[1] Time elapsed: 258.397 s <<< FAILURE! 
Aug 24 10:44:13 java.lang.AssertionError: Aug 24 10:44:13 Aug 24 10:44:13 
Expected: Records consumed by Flink should be identical to test data and 
preserve the order in split Aug 24 10:44:13 but: Mismatched record at position 
7: Expected '0W6SzacX7MNL4xLL3BZ8C3ljho4iCydbvxIl' but was 
'wVi5JaJpNvgkDEOBRC775qHgw0LyRW2HBxwLmfONeEmr' Aug 24 10:44:13 at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) Aug 24 10:44:13 at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8) Aug 24 10:44:13 at 
org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase.testTaskManagerFailure(SourceTestSuiteBase.java:271)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23940) Improve the documentation about how to use logging in PyFlink jobs

2021-08-24 Thread Dian Fu (Jira)
Dian Fu created FLINK-23940:
---

 Summary: Improve the documentation about how to use logging in 
PyFlink jobs
 Key: FLINK-23940
 URL: https://issues.apache.org/jira/browse/FLINK-23940
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Documentation
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0, 1.12.6, 1.13.3






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23936) Python UDFs instances are released and reinitialized if there is no input for more than 1 minute

2021-08-24 Thread Dian Fu (Jira)
Dian Fu created FLINK-23936:
---

 Summary: Python UDFs instances are released and reinitialized if 
there is no input for more than 1 minute
 Key: FLINK-23936
 URL: https://issues.apache.org/jira/browse/FLINK-23936
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.13.0, 1.12.0, 1.11.0, 1.10.0
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0, 1.12.6, 1.13.3






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23931) Format the exception message shown in PythonDriver

2021-08-23 Thread Dian Fu (Jira)
Dian Fu created FLINK-23931:
---

 Summary: Format the exception message shown in PythonDriver
 Key: FLINK-23931
 URL: https://issues.apache.org/jira/browse/FLINK-23931
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0


It will show messages like the following if the job could not compile in 
PythonDriver:
{code:java}
2021-08-24 13:33:45,014 INFO org.apache.flink.client.python.PythonDriver [] - 
--- Python Process Started --
2021-08-24 13:33:47,709 INFO org.apache.flink.client.python.PythonDriver [] - 
Traceback (most recent call last):
2021-08-24 13:33:47,709 INFO org.apache.flink.client.python.PythonDriver [] - 
File 
"/Users/dianfu/code/src/workspace/pyflink-examples/datastream/test_chaining.py",
 line 147, in 
2021-08-24 13:33:47,709 INFO org.apache.flink.client.python.PythonDriver [] - 
test_chaining_2()
2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - 
File 
"/Users/dianfu/code/src/workspace/pyflink-examples/datastream/test_chaining.py",
 line 141, in test_chaining_2
2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - 
statement_set.execute().wait()
2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - 
File 
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/table/statement_set.py",
 line 147, in execute
2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - 
return TableResult(self._j_statement_set.execute())
2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - 
File 
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/py4j/java_gateway.py",
 line 1286, in __call__
2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - 
answer, self.gateway_client, self.target_id, self.name)
2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - 
File 
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/util/exceptions.py",
 line 158, in deco
2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - 
raise java_exception
2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - 
pyflink.util.exceptions.TableException: 
org.apache.flink.table.api.TableException: Sink 
`default_catalog`.`default_database`.`print_3` does not exists
2021-08-24 13:33:47,710 INFO org.apache.flink.client.python.PythonDriver [] - 
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:254)
2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - 
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:182)
2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - 
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:182)
2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - 
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - 
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - 
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - 
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - 
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - 
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
2021-08-24 13:33:47,711 INFO org.apache.flink.client.python.PythonDriver [] - 
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
{code}

The format could be improved to make it more readable by removing the prefix 
**INFO org.apache.flink.client.python.PythonDriver** in each line.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23929) Chaining optimization doesn't handle properly for transformations with multiple inputs

2021-08-23 Thread Dian Fu (Jira)
Dian Fu created FLINK-23929:
---

 Summary: Chaining optimization doesn't handle properly for 
transformations with multiple inputs
 Key: FLINK-23929
 URL: https://issues.apache.org/jira/browse/FLINK-23929
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23819) Testing tgz file for python archives

2021-08-16 Thread Dian Fu (Jira)
Dian Fu created FLINK-23819:
---

 Summary: Testing tgz file for python archives
 Key: FLINK-23819
 URL: https://issues.apache.org/jira/browse/FLINK-23819
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23818) Add documentation about tgz files support for python archives

2021-08-16 Thread Dian Fu (Jira)
Dian Fu created FLINK-23818:
---

 Summary: Add documentation about tgz files support for python 
archives
 Key: FLINK-23818
 URL: https://issues.apache.org/jira/browse/FLINK-23818
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Documentation
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23787) Providing utility class for initializing various process function test harnesses

2021-08-15 Thread Dian Fu (Jira)
Dian Fu created FLINK-23787:
---

 Summary: Providing utility class for initializing various process 
function test harnesses
 Key: FLINK-23787
 URL: https://issues.apache.org/jira/browse/FLINK-23787
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu


https://lists.apache.org/thread.html/r9b058976ebaf8fd25f35c0116f6e3ae564cea7d01eea150a686e65f5%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23754) Testing Python DataStream API chaining functionality

2021-08-13 Thread Dian Fu (Jira)
Dian Fu created FLINK-23754:
---

 Summary: Testing Python DataStream API chaining functionality
 Key: FLINK-23754
 URL: https://issues.apache.org/jira/browse/FLINK-23754
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23753) Add documentation about Python DataStream API chaining optimization

2021-08-13 Thread Dian Fu (Jira)
Dian Fu created FLINK-23753:
---

 Summary: Add documentation about Python DataStream API chaining 
optimization
 Key: FLINK-23753
 URL: https://issues.apache.org/jira/browse/FLINK-23753
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23619) Remove PythonTimestampsAndWatermarksOperator

2021-08-04 Thread Dian Fu (Jira)
Dian Fu created FLINK-23619:
---

 Summary: Remove PythonTimestampsAndWatermarksOperator
 Key: FLINK-23619
 URL: https://issues.apache.org/jira/browse/FLINK-23619
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0


The functionality of this operator could be implemented using the existing 
operators. So we could remove it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23616) Support to chain for the Python operators as much as possible

2021-08-04 Thread Dian Fu (Jira)
Dian Fu created FLINK-23616:
---

 Summary: Support to chain for the Python operators as much as 
possible
 Key: FLINK-23616
 URL: https://issues.apache.org/jira/browse/FLINK-23616
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Affects Versions: 1.14.0
Reporter: Dian Fu
Assignee: Dian Fu






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23584) Introduce PythonCoProcessOperator and remove PythonCoFlatMapOperator & PythonCoMapOperator

2021-08-02 Thread Dian Fu (Jira)
Dian Fu created FLINK-23584:
---

 Summary: Introduce PythonCoProcessOperator and remove 
PythonCoFlatMapOperator & PythonCoMapOperator
 Key: FLINK-23584
 URL: https://issues.apache.org/jira/browse/FLINK-23584
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23578) Remove Python operators PythonFlatMapOperator/PythonMapOperator/PythonPartitionCustomOperator and use PythonProcessOperator instead

2021-08-02 Thread Dian Fu (Jira)
Dian Fu created FLINK-23578:
---

 Summary: Remove Python operators 
PythonFlatMapOperator/PythonMapOperator/PythonPartitionCustomOperator and use 
PythonProcessOperator instead
 Key: FLINK-23578
 URL: https://issues.apache.org/jira/browse/FLINK-23578
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23445) Separate data and timer connection into different channels for Python Table API operators

2021-07-20 Thread Dian Fu (Jira)
Dian Fu created FLINK-23445:
---

 Summary: Separate data and timer connection into different 
channels for Python Table API operators
 Key: FLINK-23445
 URL: https://issues.apache.org/jira/browse/FLINK-23445
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0


This is a follow-up of FLINK-23401 where we split the data and timer into 
different channels for Python DataStream operators. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23421) Unify the space handling in RowData CSV parser

2021-07-18 Thread Dian Fu (Jira)
Dian Fu created FLINK-23421:
---

 Summary: Unify the space handling in RowData CSV parser
 Key: FLINK-23421
 URL: https://issues.apache.org/jira/browse/FLINK-23421
 Project: Flink
  Issue Type: Bug
Reporter: Dian Fu


Currently, it calls `trim()` for types such as LocalDateTime, Boolean, 
[Int|https://github.com/apache/flink/blob/41ce9ccbf42537a854087b6ba33a61092a04538f/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L196],
 etc, however, doesn't calls `trim()` for the other types such as 
[Decimal|https://github.com/apache/flink/blob/41ce9ccbf42537a854087b6ba33a61092a04538f/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvToRowDataConverters.java#L285]
 etc. We should unify the behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23401) Separate data and timer connection into different channel

2021-07-15 Thread Dian Fu (Jira)
Dian Fu created FLINK-23401:
---

 Summary: Separate data and timer connection into different channel
 Key: FLINK-23401
 URL: https://issues.apache.org/jira/browse/FLINK-23401
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23400) Support to decode a single record for the Python coder

2021-07-15 Thread Dian Fu (Jira)
Dian Fu created FLINK-23400:
---

 Summary: Support to decode a single record for the Python coder
 Key: FLINK-23400
 URL: https://issues.apache.org/jira/browse/FLINK-23400
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.14.0


Currently, the Python coder always output a Python generator. This makes it 
impossible to separate the timers and the data into different channels.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23213) Remove ProcessFunctionOperation

2021-07-01 Thread Dian Fu (Jira)
Dian Fu created FLINK-23213:
---

 Summary: Remove ProcessFunctionOperation
 Key: FLINK-23213
 URL: https://issues.apache.org/jira/browse/FLINK-23213
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0


Currently there are three operations to support Python DataStream API: 
ProcessFunctionOperation, DataStreamKeyedStatefulOperation 
DataStreamStatelessFunctionOperation. Actually we could refactor it a bit to 
merge ProcessFunctionOperation and DataStreamStatelessFunctionOperation into 
one. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23166) ZipUtils doesn't handle properly for softlinks inside the zip file

2021-06-28 Thread Dian Fu (Jira)
Dian Fu created FLINK-23166:
---

 Summary: ZipUtils doesn't handle properly for softlinks inside the 
zip file
 Key: FLINK-23166
 URL: https://issues.apache.org/jira/browse/FLINK-23166
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.10.0
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.11.4, 1.14.0, 1.12.5, 1.13.2






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23141) Support KafkaSerializationSchema in Kafka connector for Python DataStream API

2021-06-24 Thread Dian Fu (Jira)
Dian Fu created FLINK-23141:
---

 Summary: Support KafkaSerializationSchema in Kafka connector for 
Python DataStream API
 Key: FLINK-23141
 URL: https://issues.apache.org/jira/browse/FLINK-23141
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0


See 
[https://lists.apache.org/thread.html/r8e4a51a1df7b49a2a5cbcf2432559ae16510c6224a4a5dd8f2844d39%40%3Cuser.flink.apache.org%3E]
 for more details



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23138) Raise an exception if types other than PickledBytesTypeInfo are specified for state descriptor

2021-06-24 Thread Dian Fu (Jira)
Dian Fu created FLINK-23138:
---

 Summary: Raise an exception if types other than 
PickledBytesTypeInfo are specified for state descriptor 
 Key: FLINK-23138
 URL: https://issues.apache.org/jira/browse/FLINK-23138
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.13.2
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.13.0


Although it's expected that users could use any type to define the state 
descriptor, however, only PickledBytesTypeInfo is actually supported for the 
time being. Meaningful exception should be thrown if types other than 
PickledBytesTypeInfo are used.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23133) The dependencies are not handled properly when mixing use of Python Table API and Python DataStream API

2021-06-24 Thread Dian Fu (Jira)
Dian Fu created FLINK-23133:
---

 Summary: The dependencies are not handled properly when mixing use 
of Python Table API and Python DataStream API
 Key: FLINK-23133
 URL: https://issues.apache.org/jira/browse/FLINK-23133
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.13.0, 1.12.0
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.12.5, 1.13.2


The reason is that when converting from DataStream to Table, the dependencies 
should be handled and set correctly for the existing DataStream operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23120) ByteArrayWrapperSerializer.serialize should use writeInt to serialize the length

2021-06-23 Thread Dian Fu (Jira)
Dian Fu created FLINK-23120:
---

 Summary: ByteArrayWrapperSerializer.serialize should use writeInt 
to serialize the length
 Key: FLINK-23120
 URL: https://issues.apache.org/jira/browse/FLINK-23120
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.13.0, 1.12.0
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.12.5, 1.13.2






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23033) Support object array in Python DataStream API

2021-06-18 Thread Dian Fu (Jira)
Dian Fu created FLINK-23033:
---

 Summary: Support object array in Python DataStream API
 Key: FLINK-23033
 URL: https://issues.apache.org/jira/browse/FLINK-23033
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22913) Support Python UDF chaining in Python DataStream API

2021-06-08 Thread Dian Fu (Jira)
Dian Fu created FLINK-22913:
---

 Summary: Support Python UDF chaining in Python DataStream API
 Key: FLINK-22913
 URL: https://issues.apache.org/jira/browse/FLINK-22913
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.14.0


Currently, for the following job:
{code}
ds = ..
ds.map(map_func1)
    .map(map_func2)
{code}

The Python function `map_func1` and `map_func2` will runs in separate Python 
workers and the result of `map_func1` will be transferred to JVM and then 
transferred to `map_func2` which may resides in another Python worker. This 
introduces redundant communication and serialization/deserialization overhead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22912) Support state ttl in Python DataStream API

2021-06-07 Thread Dian Fu (Jira)
Dian Fu created FLINK-22912:
---

 Summary: Support state ttl in Python DataStream API
 Key: FLINK-22912
 URL: https://issues.apache.org/jira/browse/FLINK-22912
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22911) Align FLIP-136 (Improve interoperability between DataStream and Table API) in PyFlink Table API

2021-06-07 Thread Dian Fu (Jira)
Dian Fu created FLINK-22911:
---

 Summary: Align FLIP-136 (Improve interoperability between 
DataStream and Table API) in PyFlink Table API
 Key: FLINK-22911
 URL: https://issues.apache.org/jira/browse/FLINK-22911
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
Assignee: Dian Fu
 Fix For: 1.14.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22733) Type mismatch thrown in KeyedStream.union in Python DataStream API

2021-05-20 Thread Dian Fu (Jira)
Dian Fu created FLINK-22733:
---

 Summary: Type mismatch thrown in KeyedStream.union in Python 
DataStream API
 Key: FLINK-22733
 URL: https://issues.apache.org/jira/browse/FLINK-22733
 Project: Flink
  Issue Type: Improvement
  Components: API / Python
Affects Versions: 1.13.0, 1.12.0
Reporter: Dian Fu
 Fix For: 1.13.1, 1.12.5


See 
[http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/PyFlink-DataStream-union-type-mismatch-td43855.html]
 for more details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22589) There are quite a few Python classes doesn't appear in the Python docs

2021-05-07 Thread Dian Fu (Jira)
Dian Fu created FLINK-22589:
---

 Summary: There are quite a few Python classes doesn't appear in 
the Python docs
 Key: FLINK-22589
 URL: https://issues.apache.org/jira/browse/FLINK-22589
 Project: Flink
  Issue Type: Improvement
  Components: API / Python, Documentation
Reporter: Dian Fu
 Fix For: 1.13.1


For example, SerializationSchema and DeserializationSchema doesn't appear in 
the Python
 doc: [https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/python/]

Users have to turn to the source code to find out how to use these classes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   4   5   >