Re: Flink Python API and HADOO_CLASSPATH

2021-05-18 Thread Eduard Tudenhoefner
Hi Dian,

thanks a lot for the explanation and help. Option 2) is what I needed and
it works.

Regards
Eduard

On Tue, May 18, 2021 at 6:21 AM Dian Fu  wrote:

> Hi,
>
> 1) The cause of the exception:
> The dependencies added via pipeline.jars / pipeline.classpaths will be
> used to construct user class loader.
>
> For your job, the exception happens when
> HadoopUtils.getHadoopConfiguration is called. The reason is that
> HadoopUtils is provided by Flink which is loaded by the system classloader
> instead of user classloader. As the hadoop dependences are only available
> in the user classloader and so ClassNotFoundException was raised.
>
> So, the Hadoop dependencies are a little special than the other
> dependencies as they are also dependent by Flink, not only the user coder.
>
> 2) How to support HADOOP_CLASSPATH in local mode (e.g. execute in IDE)
>
> Currently, you have to copy the hadoop jars into the PyFlink installation
> directory: site-packages/pyflink/lib/
> However, it makes sense to support HADOOP_CLASSPATH environment variable
> in local mode to avoid copy the jars manually. I will create a ticket for
> this.
>
> 3) How to support HADOOP_CLASSPATH in remote mode (e.g. submitted via
> `flink run`)
> You could export HADOOP_CLASSPATH=`hadoop classpath` manually before
> executing `flink run`.
>
> Regards,
> Dian
>
> 2021年5月17日 下午8:45,Eduard Tudenhoefner  写道:
>
> Hello,
>
> I was wondering whether anyone has tried and/or had any luck creating a
> custom catalog with Iceberg + Flink via the Python API (
> https://iceberg.apache.org/flink/#custom-catalog)?
>
> When doing so, the docs mention that dependencies need to be specified via
> *pipeline.jars* / *pipeline.classpaths* (
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/dependency_management/).
> I'm providing the iceberg flink runtime + the hadoop libs via those and I
> can confirm that I'm landing in the *FlinkCatalogFactory* but then things
> fail because it doesn't see the *hadoop dependencies* for some reason.
>
> What would be the right way to provide the *HADOOP_CLASSPATH* when using
> the Python API? I have a minimal code example that shows the issue here:
> https://gist.github.com/nastra/92bc3bc7b7037d956aa5807988078b8d#file-flink-py-L38
>
>
> Thanks
>
>
>


Re: SIGSEGV error

2021-05-18 Thread Joshua Fan
Hi Till,
I also tried the job without gzip, it came into the same error.
But the problem is solved now. I was about to give up to solve it, I found
the mail at
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/JVM-crash-SIGSEGV-in-ZIP-GetEntry-td17326.html.
So I think maybe it was something about the serialize staff.
What I have done is :
before:

OperatorStateStore stateStore = context.getOperatorStateStore();
ListStateDescriptor lsd = new ListStateDescriptor("bucket-states",State.class);

after:

OperatorStateStore stateStore = context.getOperatorStateStore();
ListStateDescriptor lsd = new ListStateDescriptor("bucket-states",new
JavaSerializer());

Hope this is helpful.

Yours sincerely
Josh



Till Rohrmann  于2021年5月18日周二 下午2:54写道:

> Hi Joshua,
>
> could you try whether the job also fails when not using the gzip format?
> This could help us narrow down the culprit. Moreover, you could try to run
> your job and Flink with Java 11 now.
>
> Cheers,
> Till
>
> On Tue, May 18, 2021 at 5:10 AM Joshua Fan  wrote:
>
>> Hi all,
>>
>> Most of the posts says that "Most of the times, the crashes in
>> ZIP_GetEntry occur when the jar file being accessed has been
>> modified/overwritten while the JVM instance was running. ", but do not
>> know when and which jar file was modified according to the job running in
>> flink.
>>
>> for your information.
>>
>> Yours sincerely
>> Josh
>>
>> Joshua Fan  于2021年5月18日周二 上午10:15写道:
>>
>>> Hi Stephan, Till
>>>
>>> Recently, I tried to upgrade a flink job from 1.7 to 1.11,
>>> unfortunately, the weird problem appeared, " SIGSEGV (0xb) at
>>> pc=0x0025, pid=135306, tid=140439001388800".  The pid log is
>>> attached.
>>> Actually, it is a simple job that consumes messages from kafka and
>>> writes into hdfs with a gzip format. It can run in 1.11 for about 2
>>> minutes, then the JVM will crash, then job restart and jvm crash again
>>> until the application fails.
>>> I also tried to set -Dsun.zip.disableMemoryMapping=true,but it turns
>>> out helpless, the same crash keeps happening. Google suggests to upgrade
>>> jdk to jdk1.9, but it is not feasible.
>>> Any suggestions? Thanks a lot.
>>>
>>> Yours sincerely
>>> Josh
>>>
>>> Stephan Ewen  于2019年9月13日周五 下午11:11写道:
>>>
 Given that the segfault happens in the JVM's ZIP stream code, I am
 curious is this is a bug in Flink or in the JVM core libs, that happens to
 be triggered now by newer versions of FLink.

 I found this on StackOverflow, which looks like it could be related:
 https://stackoverflow.com/questions/38326183/jvm-crashed-in-java-util-zip-zipfile-getentry
 Can you try the suggested option "-Dsun.zip.disableMemoryMapping=true"?


 On Fri, Sep 13, 2019 at 11:36 AM Till Rohrmann 
 wrote:

> Hi Marek,
>
> could you share the logs statements which happened before the SIGSEGV
> with us? They might be helpful to understand what happened before.
> Moreover, it would be helpful to get access to your custom serializer
> implementations. I'm also pulling in Gordon who worked on
> the TypeSerializerSnapshot improvements.
>
> Cheers,
> Till
>
> On Thu, Sep 12, 2019 at 9:28 AM Marek Maj  wrote:
>
>> Hi everyone,
>>
>> Recently we decided to upgrade from flink 1.7.2 to 1.8.1. After an
>> upgrade our task managers started to fail with SIGSEGV error from time to
>> time.
>>
>> In process of adjusting the code to 1.8.1, we noticed that there were
>> some changes around TypeSerializerSnapshot interface and its
>> implementations. At that time we had a few custom serializers which we
>> decided to throw out during migration and then leverage flink default
>> serializers. We don't mind clearing the state in the process of 
>> migration,
>> an effort to migrate with state seems to be not worth it.
>>
>> Unfortunately after running new version we see SIGSEGV errors from
>> time to time. It may be that serialization is not the real cause, but at
>> the moment it seems to be the most probable reason. We have not performed
>> any significant code changes besides serialization area.
>>
>> We run job on yarn, hdp version 2.7.3.2.6.2.0-205.
>> Checkpoint configuration: RocksDB backend, not incremental, 50s min
>> processing time
>>
>> You can find parts of JobManager log and ErrorFile log of failed
>> container included below.
>>
>> Any suggestions are welcome
>>
>> Best regards
>> Marek Maj
>>
>> jobmanager.log
>>
>> 019-09-10 16:30:28.177 INFO  o.a.f.r.c.CheckpointCoordinator   -
>> Completed checkpoint 47 for job c8a9ae03785ade86348c3189cf7dd965
>> (18532488122 bytes in 60871 ms).
>>
>> 2019-09-10 16:31:19.223 INFO  o.a.f.r.c.CheckpointCoordinator   -
>> Triggering checkpoint 48 @ 1568111478177 for job
>> c8a9ae03785ade86348c3189cf7dd965.
>>
>> 2019-09-10

Re: Root Exception can not be shown on Web UI in Flink 1.13.0

2021-05-18 Thread Matthias Pohl
Sorry, for not getting back earlier. I missed that thread. It looks like
some wrong assumption on our end. Hence, Yangze and Guowei are right. I'm
gonna look into the issue.

Matthias

On Fri, May 14, 2021 at 4:21 AM Guowei Ma  wrote:

> Hi, Gary
>
> I think it might be a bug. So would you like to open a jira for this.
> And could you share the exception ,which the TaskManagerLocation is null?
> It might be very helpful to verify the cause.
>
> Best,
> Guowei
>
>
> On Thu, May 13, 2021 at 10:36 AM Yangze Guo  wrote:
>
>> Hi, it seems to be related to FLINK-22276. Thus, I'd involve Matthias
>> to take a look.
>>
>> @Matthias My gut feeling is that not all execution who has failureInfo
>> has been deployed?
>>
>> Best,
>> Yangze Guo
>>
>> On Wed, May 12, 2021 at 10:12 PM Gary Wu  wrote:
>> >
>> > Hi,
>> >
>> > We have upgraded our Flink applications to 1.13.0 but we found that
>> Root Exception can not be shown on Web UI with an internal server error
>> message. After opening browser development console and trace the message,
>> we found that there is a exception in jobmanager:
>> >
>> > 2021-05-12 13:30:45,589 ERROR
>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler [] -
>> Unhandled exception.
>> > java.lang.IllegalArgumentException: The location must not be null for a
>> non-global failure.
>> > at
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> > at
>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.assertLocalExceptionInfo(JobExceptionsHandler.java:218)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> > at
>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createRootExceptionInfo(JobExceptionsHandler.java:191)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> > at
>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>> ~[?:?]
>> > at java.util.stream.SliceOps$1$1.accept(SliceOps.java:199) ~[?:?]
>> > at
>> java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1632)
>> ~[?:?]
>> > at
>> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
>> ~[?:?]
>> > at
>> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
>> ~[?:?]
>> > at
>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488) ~[?:?]
>> > at
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>> ~[?:?]
>> > at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
>> ~[?:?]
>> > at
>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
>> > at
>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
>> ~[?:?]
>> > at
>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionHistory(JobExceptionsHandler.java:169)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> > at
>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionsInfo(JobExceptionsHandler.java:154)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> > at
>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:101)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> > at
>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:63)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> > at
>> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87)
>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>> > at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
>> [?:?]
>> > at
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>> [?:?]
>> > at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> [?:?]
>> > at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>> > at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>> [?:?]
>> > at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> [?:?]
>> > at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> [?:?]
>> > at java.lang.Thread.run(Thread.java:834) [?:?]
>> >
>> > We would like to check Is there any configuration change should be done
>> for the application? Thanks!
>> >
>> > Regards,
>> > -Gary
>> >
>> >
>> >
>> > APPIER EMAIL NOTICE
>> >
>> > The contents of this email message and any attachments from Appier
>> Group Inc. and/or its affiliates may be privileged and confidential. If you
>> are not the intended recipient of this email, please note that any
>> disclosure, copying, distribution, or use of this message or its
>> attachments is prohibited. If you have received this email in error, please
>> contact us immediately a

PyFlink UDF: No match found for function signature XXX

2021-05-18 Thread Yik San Chan
Hi,

I have a PyFlink script that fails to use a simple UDF. The full script can
be found below:

```python
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
DataTypes,
EnvironmentSettings,
SqlDialect,
StreamTableEnvironment,
)
from pyflink.table.udf import udf


@udf(
input_types=[DataTypes.INT(), DataTypes.INT()],
result_type=DataTypes.BIGINT(),
)
def add(i, j):
return i + j


TRANSFORM = """
INSERT INTO aiinfra.mysink
SELECT ADD(a, b)
FROM aiinfra.mysource
"""

CREATE_CATALOG = """
CREATE CATALOG hive WITH (
'type' = 'hive',
'hive-conf-dir' = '/data/software/hive-2.1.0/conf'
)"""

USE_CATALOG = "USE CATALOG hive"


exec_env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(
stream_execution_environment=exec_env, environment_settings=env_settings
)

t_env.create_temporary_function("add", add)

t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
t_env.execute_sql(CREATE_CATALOG)
t_env.execute_sql(USE_CATALOG)

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_result = t_env.execute_sql(TRANSFORM)
```

However, when I submit the python file to my flink cluster, it throws
exception:

```
[INFO] 2021-05-18 17:27:47.758  - [taskAppId=TASK-90019-86729-380519]:[152]
-  -> Traceback (most recent call last):
 File "aiinfra/batch_example.py", line 50, in 
   t_result = t_env.execute_sql(TRANSFORM)
 File
"/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py",
line 766, in execute_sql
 File
"/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
 File
"/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
 File
"/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
: org.apache.flink.table.api.ValidationException: SQL validation failed.
>From line 3, column 8 to line 3, column 16: No match found for function
signature ADD(, )
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:193)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:536)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:659)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```

Seems it has difficulties knowing the "add" function has already been
registered. Changing "ADD(a, b)" to "add(a, b)" doesn't help, therefore I
don't think it is a upper-or-lower case issue.

Also, if I replace "ADD(a, b)" with the simple "a + b", the script produces
exactly what I need.

Regarding aiinfra.mysource and aiinfra.mysink: aiinfra.mysource has 2
columns, a bigint and b bigint. aiinfra.mysink has 1 column, c bigint.

Any help? Thanks!

Best,
Yik San


Re: PyFlink UDF: No match found for function signature XXX

2021-05-18 Thread Dian Fu
Hi Yik San,

The expected input types for add are DataTypes.INT, however, the schema of 
aiinfra.mysource is: a bigint and b bigint.

Regards,
Dian

> 2021年5月18日 下午5:38,Yik San Chan  写道:
> 
> Hi,
> 
> I have a PyFlink script that fails to use a simple UDF. The full script can 
> be found below:
> 
> ```python
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import (
> DataTypes,
> EnvironmentSettings,
> SqlDialect,
> StreamTableEnvironment,
> )
> from pyflink.table.udf import udf
> 
> 
> @udf(
> input_types=[DataTypes.INT(), DataTypes.INT()],
> result_type=DataTypes.BIGINT(),
> )
> def add(i, j):
> return i + j
> 
> 
> TRANSFORM = """
> INSERT INTO aiinfra.mysink
> SELECT ADD(a, b)
> FROM aiinfra.mysource
> """
> 
> CREATE_CATALOG = """
> CREATE CATALOG hive WITH (
> 'type' = 'hive',
> 'hive-conf-dir' = '/data/software/hive-2.1.0/conf'
> )"""
> 
> USE_CATALOG = "USE CATALOG hive"
> 
> 
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
> t_env = StreamTableEnvironment.create(
> stream_execution_environment=exec_env, environment_settings=env_settings
> )
> 
> t_env.create_temporary_function("add", add)
> 
> t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
> t_env.execute_sql(CREATE_CATALOG)
> t_env.execute_sql(USE_CATALOG)
> 
> t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
> t_result = t_env.execute_sql(TRANSFORM)
> ```
> 
> However, when I submit the python file to my flink cluster, it throws 
> exception:
> 
> ```
> [INFO] 2021-05-18 17:27:47.758  - [taskAppId=TASK-90019-86729-380519]:[152] - 
>  -> Traceback (most recent call last):
>  File "aiinfra/batch_example.py", line 50, in 
>t_result = t_env.execute_sql(TRANSFORM)
>  File 
> "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py",
>  line 766, in execute_sql
>  File 
> "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>  line 1286, in __call__
>  File 
> "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
>  line 147, in deco
>  File 
> "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>  line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
> : org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 3, column 8 to line 3, column 16: No match found for function signature 
> ADD(, )
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org 
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:193)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:536)
> at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248)
> at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:659)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> ```
> 
> Seems it has difficulties knowing the "add" function has already been 
> registered. Changing "ADD(a, b)" to "add(a, b)" doesn't help, therefore I 
> don't think it is a upper-or-lower case issue.
> 
> Also, if I replace "ADD(a, b)" with the simple "a + b", the script produces 
> exactly what I need.
> 
> Regarding aiinfra.mysource and aiinfra.mysink: aiinfra.mysource has 2 
> columns, a bigint and b bigint. aiinfra.mysink has 1 column, c bigint.
> 
> Any help? Thanks!
> 
> Best,
> Yik San



Re: Getting error in pod template

2021-05-18 Thread Yang Wang
Could you share how you are starting the Flink native k8s application with
pod template?

Usually it look like the following commands. And you need to have the Flink
binary on your local machine.
Please note that pod template is working with native K8s mode only. And you
could not use the "kubectl" to directly apply the pod template.

bin/flink run-application -t kubernetes-application \
-Dkubernetes.cluster-id=my-flink-native-k8s-app \
-Dkubernetes.container.image=flink:1.13 \
-Djobmanager.memory.process.size=1088m \
-Dkubernetes.jobmanager.cpu=0.5 \
-Dkubernetes.taskmanager.cpu=0.5 \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dkubernetes.pod-template-file=/path/of/kubernetes-pod-template.yaml \
local:///opt/flink/examples/streaming/StateMachineExample.jar


Best,
Yang

Priyanka Manickam  于2021年5月18日周二 下午5:24写道:

> Hi All,
>
> I have used imagePullPolicy: Never, but still i am getting error in main
> container.
>
> Could someone please help me out here
>
>
> Thanks,
> Priyanka Manickam
>
>
> On Fri, 14 May 2021, 21:00 Priyanka Manickam, <
> priyanka.manick...@gmail.com> wrote:
>
>> Hi yang,
>>
>> I was using pod template to fetch the logs to the particular repository.
>>
>> But while deploying i have got some error , says "
>> jobmanager-pod-template" is invalid : spec.containers(0).image: required
>> value.
>>
>> . And if i try to give add the image for flink-main-container. Its giving
>> image pull back of error.
>>
>> Am i proceeding in the a correct way . Because in the flink official
>> website , no image is added after the flink-main-container.
>>
>> Could you please help with this. I have also searchsd for the demo videos
>> for using the pod template with flink native kubernetes but i could not
>> able to find..If you could share any demo videos on the website it will
>> very useful for everyone.
>>
>> Good year ahead..
>>
>> Thanks,
>> Priyanka Manickam.
>>
>>
>>


Re: two questions about flink stream processing: kafka sources and TimerService

2021-05-18 Thread Ingo Bürk
Hi Jin,

1) As far as I know the order is only guaranteed for events from the same
partition. If you want events across partitions to remain in order you may
need to use parallelism 1. I'll attach some links here which might be
useful:

https://stackoverflow.com/questions/50340107/order-of-events-with-chained-keyby-calls-on-same-key
https://stackoverflow.com/questions/44156774/ordering-of-records-in-a-keyed-stream-in-flink
https://stackoverflow.com/questions/50573174/flink-kafka-producer-elements-out-of-order

2) Indeed there doesn't seem to be a way to access the InternalTimerService
from a ProcessFunction at the moment. One approach could be to implement
this yourself using a MapState. Otherwise I think you need to implement
your own operator from which you can then access InternalTimerService
similar to how KeyedCoProcessOperator does it as well.


Regards
Ingo

On Wed, May 12, 2021 at 8:32 AM Jin Yi  wrote:

> hello.  thanks ahead of time for anyone who answers.
>
> 1.  verifying my understanding: for a kafka source that's partitioned on
> the same piece of data that is later used in a keyBy, if we are relying on
> the kafka timestamp as the event timestamp, is it guaranteed that the event
> stream of the source is in the kafka pipeline's insertion order for the
> topic?
>
> 2.  is there a way to use the InternalTimerService from within a
> ProcessFunction (specifically, a KeyedCoProcessFunction)?  i don't see an
> easy way to do this, except by changing the TimerService interface.  the
> use case for my need is that i'd like to have timers to clean up the left
> and right keyed state using namespaced timers like how IntervalJoin does it
> (
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L256).
> right now, b/c the KeyedCoProcessFunction only gives us the
> SimpleTimerService via the Context, i can only trigger onTimer execution
> without being able to refine the cleaning of state to just the event state
> of the side that a timer was originated from.  without this, it'll end up
> needing to visit state associated with both event streams which isn't
> performant as those streams can have different throughputs (and therefore,
> expect to have different retention characteristics/needs).
>
> thanks.
>


Re: PyFlink UDF: No match found for function signature XXX

2021-05-18 Thread Yik San Chan
Hi Dian,

I changed the udf to:

```python
@udf(
input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
result_type=DataTypes.BIGINT(),
)
def add(i, j):
return i + j
```

But I still get the same error.

On Tue, May 18, 2021 at 5:47 PM Dian Fu  wrote:

> Hi Yik San,
>
> The expected input types for add are DataTypes.INT, however, the schema of
> aiinfra.mysource is: a bigint and b bigint.
>
> Regards,
> Dian
>
> 2021年5月18日 下午5:38,Yik San Chan  写道:
>
> Hi,
>
> I have a PyFlink script that fails to use a simple UDF. The full script
> can be found below:
>
> ```python
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import (
> DataTypes,
> EnvironmentSettings,
> SqlDialect,
> StreamTableEnvironment,
> )
> from pyflink.table.udf import udf
>
>
> @udf(
> input_types=[DataTypes.INT(), DataTypes.INT()],
> result_type=DataTypes.BIGINT(),
> )
> def add(i, j):
> return i + j
>
>
> TRANSFORM = """
> INSERT INTO aiinfra.mysink
> SELECT ADD(a, b)
> FROM aiinfra.mysource
> """
>
> CREATE_CATALOG = """
> CREATE CATALOG hive WITH (
> 'type' = 'hive',
> 'hive-conf-dir' = '/data/software/hive-2.1.0/conf'
> )"""
>
> USE_CATALOG = "USE CATALOG hive"
>
>
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
> t_env = StreamTableEnvironment.create(
> stream_execution_environment=exec_env, environment_settings=env_settings
> )
>
> t_env.create_temporary_function("add", add)
>
> t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
> t_env.execute_sql(CREATE_CATALOG)
> t_env.execute_sql(USE_CATALOG)
>
> t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
> t_result = t_env.execute_sql(TRANSFORM)
> ```
>
> However, when I submit the python file to my flink cluster, it throws
> exception:
>
> ```
> [INFO] 2021-05-18 17:27:47.758  -
> [taskAppId=TASK-90019-86729-380519]:[152] -  -> Traceback (most recent call
> last):
>  File "aiinfra/batch_example.py", line 50, in 
>t_result = t_env.execute_sql(TRANSFORM)
>  File
> "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> line 766, in execute_sql
>  File
> "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>  File
> "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 147, in deco
>  File
> "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
> : org.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 3, column 8 to line 3, column 16: No match found for function
> signature ADD(, )
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> 
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:193)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:536)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:659)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> ```
>
> Seems it has difficulties knowing the "add" function has already been
> registered. Changing "ADD(a, b)" to "add(a, b)" doesn't help, therefore I
> don't think it is a upper-or-lower case issue.
>
> Also, if I replace "ADD(a, b)" with the simple "a + b", the script
> produces exactly what I need.
>
> Regarding aiinfra.mysource and aiinfra.mysink: aiinfra.mysource has 2
> columns, a bigint and b

Stop command failure

2021-05-18 Thread V N, Suchithra (Nokia - IN/Bangalore)
Hi,

Stop command is failing with below error with apache flink 1.12.3 version. 
Could you pls help.

log":"[Flink-RestClusterClient-IO-thread-2] 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel Force-closing a 
channel whose registration task was not accepted by an event loop: [id: 
0x4fb1c35c]"}
java.util.concurrent.RejectedExecutionException: event executor terminated
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:926)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:353)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:346)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:828)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:818)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:471)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:123)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:421) 
~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:344) 
~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:258) 
~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$null$23(RestClusterClient.java:777)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
 [?:?]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
[?:?]
at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610) 
[?:?]
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649)
 [?:?]
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
 [?:?]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
[?:?]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
[?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
{"type":"log","host":"${env:CLOG_HOST}","level":"ERROR","systemid":"${env:CLOG_SYSTEMID}","system":"${env:CLOG_SYSTEM}","time":"2021-05-18T10:32:04.934Z","timezone":"UTC","log":"[Flink-RestClusterClient-IO-thread-2]
 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
 Failed to submit a listener notification task. Event loop shut down?"}
java.util.concurrent.RejectedExecutionException: event executor terminated
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:926)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:353)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:346)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:828)
 ~[flink-dist_2.11-1.12.3.jar:1.12.3]
at 
org.apache.flink.shaded.netty4.io.nett

Flink upgraded from 1.10.0 to 1.12.0

2021-05-18 Thread 王炳焱
When I upgraded from Flink1.10.0 to Flink1.12.0.  Unable to restore SavePoint  
And prompt the following error  


2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name Calc(select=[((CAST((log_info 
get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME 
_UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 
_UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 
_UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info 
get_json_object2 _UTF-16LE'status') SEARCH 
Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info 
get_json_object2 _UTF-16LE'data.itemType') SEARCH 
Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info 
get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 
characters length limit and was truncated.
2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name 
SourceConversion(table=[default_catalog.default_database.wkb_crm_order], 
fields=[log_info, proctime]) exceeded the 80 characters length limit and was 
truncated.
2021-05-14 22:02:44,879 ERROR 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
Caught unexpected exception.
java.io.IOException: Could not find class 
'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
 in classpath.
at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:299)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitialize

Re: PyFlink UDF: No match found for function signature XXX

2021-05-18 Thread Yik San Chan
With the help from Dian and friends, it turns out the root cause is:

When it `create_temporary_function`, it is in the default catalog. However,
when it `execute_sql(TRANSFORM)`, it is in the "hive" catalog. A function
defined as a temporary function in catalog "default" is not accessible from
catalog "hive".

To solve the problem, simply replace `create_temporary_function` with
`create_temporary_system_function` so that it is accessible from other
catalogs as well.

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/overview/

Best,
Yik San

On Tue, May 18, 2021 at 6:43 PM Yik San Chan 
wrote:

> Hi Dian,
>
> I changed the udf to:
>
> ```python
> @udf(
> input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
> result_type=DataTypes.BIGINT(),
> )
> def add(i, j):
> return i + j
> ```
>
> But I still get the same error.
>
> On Tue, May 18, 2021 at 5:47 PM Dian Fu  wrote:
>
>> Hi Yik San,
>>
>> The expected input types for add are DataTypes.INT, however, the schema
>> of aiinfra.mysource is: a bigint and b bigint.
>>
>> Regards,
>> Dian
>>
>> 2021年5月18日 下午5:38,Yik San Chan  写道:
>>
>> Hi,
>>
>> I have a PyFlink script that fails to use a simple UDF. The full script
>> can be found below:
>>
>> ```python
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import (
>> DataTypes,
>> EnvironmentSettings,
>> SqlDialect,
>> StreamTableEnvironment,
>> )
>> from pyflink.table.udf import udf
>>
>>
>> @udf(
>> input_types=[DataTypes.INT(), DataTypes.INT()],
>> result_type=DataTypes.BIGINT(),
>> )
>> def add(i, j):
>> return i + j
>>
>>
>> TRANSFORM = """
>> INSERT INTO aiinfra.mysink
>> SELECT ADD(a, b)
>> FROM aiinfra.mysource
>> """
>>
>> CREATE_CATALOG = """
>> CREATE CATALOG hive WITH (
>> 'type' = 'hive',
>> 'hive-conf-dir' = '/data/software/hive-2.1.0/conf'
>> )"""
>>
>> USE_CATALOG = "USE CATALOG hive"
>>
>>
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
>> t_env = StreamTableEnvironment.create(
>> stream_execution_environment=exec_env, environment_settings=env_settings
>> )
>>
>> t_env.create_temporary_function("add", add)
>>
>> t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
>> t_env.execute_sql(CREATE_CATALOG)
>> t_env.execute_sql(USE_CATALOG)
>>
>> t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
>> t_result = t_env.execute_sql(TRANSFORM)
>> ```
>>
>> However, when I submit the python file to my flink cluster, it throws
>> exception:
>>
>> ```
>> [INFO] 2021-05-18 17:27:47.758  -
>> [taskAppId=TASK-90019-86729-380519]:[152] -  -> Traceback (most recent call
>> last):
>>  File "aiinfra/batch_example.py", line 50, in 
>>t_result = t_env.execute_sql(TRANSFORM)
>>  File
>> "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py",
>> line 766, in execute_sql
>>  File
>> "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>> line 1286, in __call__
>>  File
>> "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
>> line 147, in deco
>>  File
>> "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>> line 328, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o4.executeSql.
>> : org.apache.flink.table.api.ValidationException: SQL validation failed.
>> From line 3, column 8 to line 3, column 16: No match found for function
>> signature ADD(, )
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> 
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:193)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:536)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:659)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>> at
>> org.apache.flink.api.python

Re: SIGSEGV error

2021-05-18 Thread Till Rohrmann
Great to hear that you fixed the problem by specifying an explicit
serializer for the state.

Cheers,
Till

On Tue, May 18, 2021 at 9:43 AM Joshua Fan  wrote:

> Hi Till,
> I also tried the job without gzip, it came into the same error.
> But the problem is solved now. I was about to give up to solve it, I found
> the mail at
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/JVM-crash-SIGSEGV-in-ZIP-GetEntry-td17326.html.
> So I think maybe it was something about the serialize staff.
> What I have done is :
> before:
>
> OperatorStateStore stateStore = context.getOperatorStateStore();
> ListStateDescriptor lsd = new 
> ListStateDescriptor("bucket-states",State.class);
>
> after:
>
> OperatorStateStore stateStore = context.getOperatorStateStore();
> ListStateDescriptor lsd = new ListStateDescriptor("bucket-states",new 
> JavaSerializer());
>
> Hope this is helpful.
>
> Yours sincerely
> Josh
>
>
>
> Till Rohrmann  于2021年5月18日周二 下午2:54写道:
>
>> Hi Joshua,
>>
>> could you try whether the job also fails when not using the gzip format?
>> This could help us narrow down the culprit. Moreover, you could try to run
>> your job and Flink with Java 11 now.
>>
>> Cheers,
>> Till
>>
>> On Tue, May 18, 2021 at 5:10 AM Joshua Fan 
>> wrote:
>>
>>> Hi all,
>>>
>>> Most of the posts says that "Most of the times, the crashes in
>>> ZIP_GetEntry occur when the jar file being accessed has been
>>> modified/overwritten while the JVM instance was running. ", but do not
>>> know when and which jar file was modified according to the job running in
>>> flink.
>>>
>>> for your information.
>>>
>>> Yours sincerely
>>> Josh
>>>
>>> Joshua Fan  于2021年5月18日周二 上午10:15写道:
>>>
 Hi Stephan, Till

 Recently, I tried to upgrade a flink job from 1.7 to 1.11,
 unfortunately, the weird problem appeared, " SIGSEGV (0xb) at
 pc=0x0025, pid=135306, tid=140439001388800".  The pid log is
 attached.
 Actually, it is a simple job that consumes messages from kafka and
 writes into hdfs with a gzip format. It can run in 1.11 for about 2
 minutes, then the JVM will crash, then job restart and jvm crash again
 until the application fails.
 I also tried to set -Dsun.zip.disableMemoryMapping=true,but it turns
 out helpless, the same crash keeps happening. Google suggests to upgrade
 jdk to jdk1.9, but it is not feasible.
 Any suggestions? Thanks a lot.

 Yours sincerely
 Josh

 Stephan Ewen  于2019年9月13日周五 下午11:11写道:

> Given that the segfault happens in the JVM's ZIP stream code, I am
> curious is this is a bug in Flink or in the JVM core libs, that happens to
> be triggered now by newer versions of FLink.
>
> I found this on StackOverflow, which looks like it could be related:
> https://stackoverflow.com/questions/38326183/jvm-crashed-in-java-util-zip-zipfile-getentry
> Can you try the suggested option "-Dsun.zip.disableMemoryMapping=true"
> ?
>
>
> On Fri, Sep 13, 2019 at 11:36 AM Till Rohrmann 
> wrote:
>
>> Hi Marek,
>>
>> could you share the logs statements which happened before the SIGSEGV
>> with us? They might be helpful to understand what happened before.
>> Moreover, it would be helpful to get access to your custom serializer
>> implementations. I'm also pulling in Gordon who worked on
>> the TypeSerializerSnapshot improvements.
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 12, 2019 at 9:28 AM Marek Maj 
>> wrote:
>>
>>> Hi everyone,
>>>
>>> Recently we decided to upgrade from flink 1.7.2 to 1.8.1. After an
>>> upgrade our task managers started to fail with SIGSEGV error from time 
>>> to
>>> time.
>>>
>>> In process of adjusting the code to 1.8.1, we noticed that there
>>> were some changes around TypeSerializerSnapshot interface and its
>>> implementations. At that time we had a few custom serializers which we
>>> decided to throw out during migration and then leverage flink default
>>> serializers. We don't mind clearing the state in the process of 
>>> migration,
>>> an effort to migrate with state seems to be not worth it.
>>>
>>> Unfortunately after running new version we see SIGSEGV errors from
>>> time to time. It may be that serialization is not the real cause, but at
>>> the moment it seems to be the most probable reason. We have not 
>>> performed
>>> any significant code changes besides serialization area.
>>>
>>> We run job on yarn, hdp version 2.7.3.2.6.2.0-205.
>>> Checkpoint configuration: RocksDB backend, not incremental, 50s min
>>> processing time
>>>
>>> You can find parts of JobManager log and ErrorFile log of failed
>>> container included below.
>>>
>>> Any suggestions are welcome
>>>
>>> Best regards
>>> Marek Maj
>>>
>>> jobmanager.log
>>>
>>> 019-09-10 16:30:28.177 INFO  

Re: Helm chart for Flink

2021-05-18 Thread Austin Cawley-Edwards
Hey all,

Yeah, I'd be interested to see the Helm pre-upgrade hook setup, though I'd
agree with you, Alexey, that it does not provide enough control to be a
stable solution.

@Pedro Silva  I don't know if there are talks for an
official operator yet, but Kubernetes support is important to the community
and has heavy investment, as seen on the Roadmap[1], so it will only get
better! Currently, the recommended way to get started on Kubernetes is with
the "Native Kubernetes" resource manager[2], which allows running Flink
Applications on Kubernetes from the command line and might work for you.

Hope that helps!
Austin

[1]: https://flink.apache.org/roadmap.html
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes

On Mon, May 17, 2021 at 7:31 PM Alexey Trenikhun  wrote:

> I think it should be possible to use Helm pre-upgrade hook to take
> savepoint and stop currently running job and then Helm will upgrade image
> tags. The problem is that if you hit timeout while taking savepoint, it is
> not clear how to recover from this situation
>
> Alexey
> --
> *From:* Austin Cawley-Edwards 
> *Sent:* Monday, May 17, 2021 1:02 PM
> *To:* Pedro Silva
> *Cc:* user
> *Subject:* Re: Helm chart for Flink
>
> Hi Pedro,
>
> There is currently no official Kubernetes Operator for Flink and, by
> extension, there is no official Helm chart. It would be relatively easy to
> create a chart for simply deploying standalone Flink resources via the
> Kubernetes manifests described here[1], though it would leave out the
> ability to upgrade your Flink application via Helm.
>
> If you need upgrade capabilities (which most people do) *and* need to use
> Helm, the Kubernetes Operator approach is the only option for an
> "all-in-one" experience. In addition to the GCP Operator you mentioned,
> there's also a Helm chart for Lyft's Operator by lightbend[2] as well as an
> operator for the Ververica Platform with support for Helm that I've built
> here[3].
>
>
> Are you already running Flink on Kubernetes, or just looking to get
> started easily?
>
> Best,
> Austin
>
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/
> [2]: https://github.com/lightbend/flink-operator
> [3]:
> https://github.com/fintechstudios/ververica-platform-k8s-operator/blob/master/docs/guides/deployment.md
>
>
> On Mon, May 17, 2021 at 11:01 AM Pedro Silva 
> wrote:
>
>> Hello,
>>
>> Forwarding this question from the dev mailing list in case this is a more
>> appropriate list.
>>
>> Does flink have an official Helm Chart? I haven't been able to find any,
>> the closest most up-to-date one seems to be
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator.
>> Is this correct or is there a more mature and/or recommeded helm chart to
>> use?
>>
>> Thank you.
>>
>


Fastest way for decent lookup JOIN?

2021-05-18 Thread Theo Diefenthal
Hi there, 

I have the following (probably very common) usecase: I have some lookup data ( 
100 million records ) which change only slowly (in the range of some thousands 
per day). My event stream is in the order of tens of billions events per day 
and each event needs to be enriched from the 100 million lookup source. For the 
JOIN, I don't need any event time related stuff, just the newest version at the 
time of enrichment shall be taken into account. 

As a user used to the DataStream API but unfamiliar with SQL API, I built a 
small MVP. I used a connected stream and put the enrichment data into keyed 
(heap)state. My RAM is enough to hold all the data in memory (once in prod at 
least). I first streamed in all 100 million records, then I started the 
performance measurement by streaming in just 3 million events to be enriched 
against the 100 million records. I was a bit stunned that the enrichment of all 
events took about 40 seconds on my local machine. I built up a similar MVP in 
Spark where I put the 100 million records into a (pre-partioned to the JOIN 
column) hive table, the 3 million test events into a parquetfile and then run 
an outer join which also took about 40 seconds on my local machine (consuming 
only 16GB of RAM). I somehow expected Flink to be much faster as I hold the 
enrichment data already in memory (state) and at least on the localhost, there 
is no real networking involved. 

I then thought about the problems with the DataStream API: My 100 million 
events are read from an uncompressed CSV file which is 25GB in size. 
Deserialized to Java POJOs, I guess the POJOs would take 100GB heap space. 
[Actually, I run the tests in Spark with all 100million records and this Flink 
test with only 20 Million records due to too much memory used, so the 100GB is 
an estimation from 20 million records taking 20GB heap space]. When I stopped 
parsing my enrichment data to POJOs but extracted only the enrichment (join) 
attribute and kept the remaining part of the data as a simple string, the java 
heap taken was only about 25GB again for all 100million records. Not only that, 
my enrichment JOIN now took only 30 seconds to complete all records. My thought 
now is: I probably shouldn't use DataStream API with Java POJOs here, but Flink 
SQL API with "Row" classes? I remember I once read some blog with how Flink 
internally optimizes its data strucutres and can reuse certain stuff when using 
SQL API and so on. 

Before I am going to try out several variants now, my question is: What do you 
think is the fastest/most efficient way to enrich slowly changing data with the 
latest version (Processing time temporal table JOIN) [When memory isn't a big 
problem once deployed to the cluster]? Do you recommend to use the SQL API? 
With which type of JOIN? (Processing time temporal table?) and hold enrichment 
table fully in Flink managed memory (Can I express this via SQL API?) or do I 
need to use some external "LookupTableSource"? Once I run my application in the 
cluster, I suspect a "LookupTableSource" to introduce some communication 
overhead vs. querying Flink State directly? If you recommend DataStream API to 
be used: Should I read via SQL connectors and work with "Rows" in state? What 
kind of performance tunings should I take into account here (reuseObjects, 
disableChaining, ...)? 

Best regards 
Theo 


Issue reading from S3

2021-05-18 Thread Angelo G.
Hi,

I'm trying to read from and write to S3 with Flink 1.12.2. I'm submitting
the job to local cluster (tar.gz distribution). I do not have a Hadoop
installation running in the same machine. S3 (not Amazon) is running in a
remote location and I have access to it via endpoint and access/secret keys.

The issue is that I'm able to read and write from and to S3 when
using StreamExecutionEnvironment.readTextFile and DataStrean.writeAsText
methods but I can't read from S3 when using the table API.

This is the application:

package org.apache.flink;
import org.apache.flink.core.fs.FileSystem;import
org.apache.flink.streaming.api.datastream.DataStream;import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
org.apache.flink.table.api.EnvironmentSettings;import
org.apache.flink.table.api.Table;import
org.apache.flink.table.api.TableEnvironment;
public class ReadTables {

public static void main(String[] args) throws Exception {

// CLASSIC API (PLAIN TEXT)
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream ds = env.readTextFile("s3a://bucket/source.txt");

ds.writeAsText("s3a://bucket/dest.txt", FileSystem.WriteMode.OVERWRITE);

env.execute();


// TABLE API
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().useBlinkPlanner().build();

TableEnvironment t_env = TableEnvironment.create(settings);

t_env.getConfig().getConfiguration().setString("parallelism.default",
"1");

t_env.executeSql("CREATE TABLE example (  `date` STRING,
`value` INT) WITH ( 'connector' = 'filesystem', 'path' =
's3a://bucket/xxx/yyy/', 'format' = 'parquet')");

Table t1 = t_env.from("example");

t1.execute().print();

}
}


The first job works properly, reading the source.txt file and writing it to
dest.txt.

The second job does not work:

$~/flink-1.12.2$ ./bin/flink run -Dexecution.runtime-mode=BATCH -c
org.apache.flink.ReadTables flink-s3-1.0-SNAPSHOT.jar;

Job has been submitted with JobID c690faed0051d1501d5b9747b56f
Program execution finished
Job with JobID c690faed0051d1501d5b9747b56f has finished.
Job Runtime: 17358 ms

Job has been submitted with JobID ebe54017faa83af33923d50892283e11
++-+
|   date |   value |
++-+


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to fetch next result
Caused by: java.lang.RuntimeException: Failed to fetch next result
Caused by: java.io.IOException: Failed to fetch job execution result
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.client.program.ProgramInvocationException: Job failed
(JobID: ebe54017faa83af33923d50892283e11)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job
failed (JobID: ebe54017faa83af33923d50892283e11)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
Caused by: java.lang.RuntimeException: One or more fetchers have
encountered exception
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
unexpected exception while polling the records
Caused by: java.net.SocketTimeoutException: doesBucketExist on
scib-des-cm-fipoac-medusa: com.amazonaws.AmazonClientException: No AWS
Credentials provided by BasicAWSCredentialsProvider
EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
com.amazonaws.SdkClientException: Failed to connect to service endpoint:
Caused by: com.amazonaws.AmazonClientException: No AWS Credentials provided
by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider
InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException:
Failed to connect to service endpoint:
Caused by: com.amazonaws.SdkClientException: Failed to connect to service
endpoint:
Caused by: java.net.SocketTimeoutException: connect timed out

I have the access credentials configured in flink-conf.yaml file:

s3a.endpoint: http://s3.xxx
s3a.path-style: true
s3a.access-key: x
s3a.secret-key: x
s3a.entropy.key: _entropy_
s3a.entropy.length: 4
s3a.region: s3
s3a.bucket: x

I copied the flink-s3-fs-hadoop jar in the plugins folder but I had to add
it as a dependency (not provided) to the pom, otherwise a S3AFileSystem
'class not found' exception arises.

Thank you for your help,

Angelo.


Re: After upgrade from 1.11.2 to 1.13.0 parameter taskmanager.numberOfTaskSlots set to 1.

2021-05-18 Thread Alexey Trenikhun
If flink-conf.yaml is readonly, flink will complain but work fine?


From: Chesnay Schepler 
Sent: Wednesday, May 12, 2021 5:38 AM
To: Alex Drobinsky 
Cc: user@flink.apache.org 
Subject: Re: After upgrade from 1.11.2 to 1.13.0 parameter 
taskmanager.numberOfTaskSlots set to 1.


The contents of FLINK_PROPERTIES are piped as-is into the flink configuration, 
and thus require the same format as the configuration.

On 5/12/2021 2:36 PM, Alex Drobinsky wrote:
Thanks a lot !
I used TASK_MANAGER_NUMBER_OF_TASK_SLOTS in my docker-compose.yml, it works 
perfectly :)
In which format I could provide parameters via FLINK_PROPERTIES ? I'm thinking 
of abandoning the idea to copy flink-conf in Dockerfile.
Is it limited to a specific set of parameters or generic ?

ср, 12 мая 2021 г. в 15:20, Chesnay Schepler 
mailto:ches...@apache.org>>:
You could also configure the number of slots via the 
TASK_MANAGER_NUMBER_OF_TASK_SLOTS environment variable.

On 5/12/2021 2:19 PM, Chesnay Schepler wrote:
I believe this is due to FLINK-21037; we did not consider the possibility of 
users mounting the configuration directly, and instead assumed that 
modifications to the config always go through the FLINK_PROPERTIES environment 
variable.

That would also be the workaround for your issue.

On 5/12/2021 2:06 PM, Alex Drobinsky wrote:
Dear flink community,

First I need provide some minimum information about my deployment scenario:
I'm running application inside of Flink docker, below original Dockerfile:
---

FROM flink:1.13.0-scala_2.11-java11

# Copy log and monitoring related JARs to flink lib dir
COPY kafka-clients-2.4.1.jar /opt/flink/lib/

RUN chmod 777 /tmp
RUN apt-get update && apt-get install -y htop

# configuration files
COPY Log4cpp.properties /opt/flink/
COPY Log4j.properties /opt/flink/conf/log4j.properties
COPY SessionOrganizer.json /opt/flink/
COPY flink-conf.yaml /opt/flink/conf/
COPY slaves /opt/flink/conf/

# job file
COPY KafkaToSessions-shade.jar /opt/flink/lib/

# libraries
ADD libs /usr/local/lib/

# Add /usr/local/lib to ldconfig
RUN echo "/usr/local/lib/" > /etc/ld.so.conf.d/ips.conf && \
ldconfig && \
ulimit -c 0

RUN mkdir /opt/flink/ip-collection/ && \
mkdir /opt/flink/checkpoints/ && \
mkdir /opt/flink/ip-collection/incorrectIcs && \
mkdir /opt/flink/ip-collection/storage && \
mkdir /opt/flink/ip-collection/logs

CMD /opt/flink/bin/start-cluster.sh && /opt/flink/bin/flink run 
/opt/flink/lib/KafkaToSessions-shade.jar

---
If we will ignore irrelevant parts of Dockerfile, the only 2 things remains ( 
beside FROM statement)
1. Overwritten flink-conf.yml + slaves
2. CMD which executes start-cluster and job.

My flink-conf.yml:
-

rest.address: localhost
rest.port: 8088
state.backend: filesystem
state.checkpoints.dir: file:///opt/flink/checkpoints
jobmanager.memory.process.size: 2224m
jobmanager.rpc.port: 6123
jobmanager.rpc.address: localhost
taskmanager.memory.flink.size: 2224m
taskmanager.memory.task.heap.size: 1000m
taskmanager.numberOfTaskSlots: 12
taskmanager.rpc.port: 50100
taskmanager.data.port: 5
parallelism.default: 6
heartbeat.timeout: 12
heartbeat.interval: 2
env.java.opts: "-XX:+UseG1GC -XX:MaxGCPauseMillis=300"


-

Slaves file contain single line with localhost.
After start of docker, I noticed that application doesn't work due lack of 
slots. When I checked flink-conf.yml I noticed that 
taskmanager.numberOfTaskSlots is set to 1.
P.S. during first time, daemon.sh complained that it doesn't have write 
permissions to change flink-conf.yml, when I added chown flink.flink 
/opt/flink/flink-conf.yml -
it stopped to complain & taskmanager.numberOfTaskSlots change occured.

Any suggestions ?

Best regards,
Alexander







Prometheus Reporter Enhancement

2021-05-18 Thread Mason Chen
Hi all,

Would people appreciate enhancements to the prometheus reporter to include 
extra labels via a configuration, as a contribution to Flink? I can see it 
being useful for adding labels that are not job specific, but infra specific.

The change would be nicely integrated with the Flink’s ConfigOptions and unit 
tested.

Best,
Mason


Re: Prometheus Reporter Enhancement

2021-05-18 Thread Andrew Otto
Sounds useful!

On Tue, May 18, 2021 at 2:02 PM Mason Chen  wrote:

> Hi all,
>
> Would people appreciate enhancements to the prometheus reporter to include
> extra labels via a configuration, as a contribution to Flink? I can see it
> being useful for adding labels that are not job specific, but infra
> specific.
>
> The change would be nicely integrated with the Flink’s ConfigOptions and
> unit tested.
>
> Best,
> Mason
>


Re: Prometheus Reporter Enhancement

2021-05-18 Thread Chesnay Schepler
There is already a ticket for this. Note that this functionality should 
be implemented in a generic fashion to be usable for all reporters.


https://issues.apache.org/jira/browse/FLINK-17495

On 5/18/2021 8:16 PM, Andrew Otto wrote:

Sounds useful!

On Tue, May 18, 2021 at 2:02 PM Mason Chen > wrote:


Hi all,

Would people appreciate enhancements to the prometheus reporter to
include extra labels via a configuration, as a contribution to
Flink? I can see it being useful for adding labels that are not
job specific, but infra specific.

The change would be nicely integrated with the Flink’s
ConfigOptions and unit tested.

Best,
Mason





Re: RabbitMQ source does not stop unless message arrives in queue

2021-05-18 Thread Austin Cawley-Edwards
Hey all,

Thanks for the details, John! Hmm, that doesn't look too good either 😬 but
probably a different issue with the RMQ source/ sink. Hopefully, the new
FLIP-27 sources will help you guys out there! The upcoming HybridSource in
FLIP-150 [1] might also be interesting to you in finely controlling sources.

@Jose Vargas  I've created FLINK-22698 [2] to
track your issue. Do you have a small reproducible case/ GitHub repo? Also,
would you be able to provide a little bit more about the Flink job that you
see this issue in? i.e. overall parallelism, the parallelism of the
sources/ sinks, checkpointing mode.

Best,
Austin

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source
[2]: https://issues.apache.org/jira/browse/FLINK-22698

On Thu, May 13, 2021 at 9:25 PM John Morrow 
wrote:

> Hi Jose, hey Austin!!
>
> I know we were just recently looking at trying to consume a fixed number
> of messages from an RMQ source, process them and output them to an RMQ
> sink. As a naive first attempt at stopping the job when the target number
> of messaged had been processed, we put a counter state in the process
> function and tried throwing an exception when the counter >= the target
> message count.
>
> The job had:
>
>- parallelism: 1
>- checkpointing: 1000 (1 sec)
>- restartStrategy: noRestart
>- prefetchCount: 100
>
> Running it with 150 messages in the input queue and 150 also as the target
> number, at the end the queues had:
>
>- output queue - 150
>- input queue - 50
>
> So it looks like it did transfer all the messages, but some unack'd ones
> also got requeued back at the source so end up as duplicates. I know
> throwing an exception in the Flink job is not the same as triggering a
> stateful shutdown, but it might be hitting similar unack issues.
>
> John
>
> --
> *From:* Austin Cawley-Edwards 
> *Sent:* Thursday 13 May 2021 16:49
> *To:* Jose Vargas ; John Morrow <
> johnniemor...@hotmail.com>
> *Cc:* user 
> *Subject:* Re: RabbitMQ source does not stop unless message arrives in
> queue
>
> Hey Jose,
>
> Thanks for bringing this up – it indeed sounds like a bug. There is
> ongoing work to update the RMQ source to the new interface, which might
> address some of these issues (or should, if it is not already), tracked in
> FLINK-20628[1]. Would you be able to create a JIRA issue for this, or would
> you like me to?
>
> At my previous company, we only consumed one Rabbit queue per application,
> so we didn't run into this exactly but did see other weird behavior in the
> RMQ source that could be related. I'm going to cc @John Morrow
>  who might be able to contribute to what he's
> seen working with the source, if he's around. I remember some messages not
> properly being ack'ed during a stateful shutdown via the Ververica
> Platform's stop-with-savepoint functionality that you mention, though that
> might be more related to FLINK-20244[2], perhaps.
>
>
> Best,
> Austin
>
> [1]: https://issues.apache.org/jira/browse/FLINK-20628
> [2]: https://issues.apache.org/jira/browse/FLINK-20244
>
> On Thu, May 13, 2021 at 10:23 AM Jose Vargas 
> wrote:
>
> Hi,
>
> I am using Flink 1.12 to read from and write to a RabbitMQ cluster.
> Flink's RabbitMQ source has some surprising behavior when a
> stop-with-savepoint request is made.
>
> *Expected Behavior:*
> The stop-with-savepoint request stops the job with a FINISHED state.
>
> *Actual Behavior:*
> The stop-with-savepoint request either times out or hangs indefinitely
> unless a message arrives in all the queues that the job consumes from after
> the stop-with-savepoint request is made.
>
>
> I know that one possible workaround is to send a sentinel value to each of
> the queues consumed by the job that the deserialization schema checks in
> its isEndOfStream method. However, this is somewhat cumbersome and
> complicates the continuous delivery of a Flink job. For example,
> Ververica Platform will trigger a stop-with-savepoint for the user if one
> of many possible Flink configurations for a job are changed. The
> stop-with-savepoint can then hang indefinitely because only some of the
> RabbitMQ sources will have reached a FINISHED state.
>
> I have attached the TaskManager thread dump after the save-with-savepoint
> request was made. Most every thread is either sleeping or waiting around
> for locks to be released, and then there are a handful of threads trying to
> read data from a socket via the com.rabbitmq.client.impl.Frame.readFrom
> method.
>
> Ideally, once a stop-with-savepoint request is made, the threads trying to
> read data from RabbitMQ would be interrupted so that all RabbitMQ sources
> would reach a FINISHED state.
>
> Regular checkpoints and savepoints complete successfully, it is only the
> stop-with-savepoint request where I see this behavior.
>
>
> Respectfully,
>
>
> Jose Vargas
>
> Software Engineer, Data Engineering
>
> E: jose.var...@fiscalnote.com
>
> fiscalnot

Guidance for Integration Tests with External Technologies

2021-05-18 Thread Rion Williams
Hey all,

I’ve been taking a very TDD-oriented approach to developing many of the Flink 
apps I’ve worked on, but recently I’ve encountered a problem that has me 
scratching my head.

A majority of my integration tests leverage a few external technologies such as 
Kafka and typically a relational database like Postgres. I’ve found 
in-memory/embedded versions of these that have worked well in the past to allow 
me to:

- send messages into a kafka topic
- run my exact Flink job asynchronously 
- verify my results / assertions in Postgres via awaitility

Recently, I had a use case for Broadcast state for a job and found that my 
tests would run successfully when executed directly but multiple tests run in 
sequence (in the same file), it seems that Flink would fail to consume from the 
topics and eventually fail the assertion. 

I’ve tried several approaches including:
- ensuring that each Flink job is passed a unique consumer.id / group.id / 
application.id
- ensuring each test has brand new Kafka topics specific for it
- spinning up a new Flink cluster / Kafka cluster / Postgres instance per test

I’m not entirely sure what could be causing the problem but it only occurs for 
Flink jobs that read from two topics and leverage broadcast state. All other 
integration tests that use Kafka/Flink/Postgres still pass and can be run in 
sequence.

Any advice / examples / recommendations would be helpful. l’d be happy to 
elaborate and provide code whenever possible as well.

Thanks,

Rion



DataStream Batch Execution Mode and large files.

2021-05-18 Thread Marco Villalobos
Hi,

I am using the DataStream API in Batch Execution Mode, and my "source" is
an s3 Buckets with about 500 GB of data spread across many files.

Where does Flink stored the results of processed / produced data between
tasks?

There is no way that 500GB will fit in memory.  So I am very curious how
that happens.

Can somebody please explain?

Thank you.

Marco A. Villalobos


DataStream API Batch Execution Mode restarting...

2021-05-18 Thread Marco Villalobos
I have a DataStream running in Batch Execution mode within YARN on EMR.
My job failed an hour into the job two times in a row because the task
manager heartbeat timed out.

Can somebody point me out how to restart a job in this situation? I can't
find that section of the documentation.

thank you.


Re: Root Exception can not be shown on Web UI in Flink 1.13.0

2021-05-18 Thread Gary Wu
Thanks! I have updated the detail and task manager log in
https://issues.apache.org/jira/browse/FLINK-22688.

Regards,
-Gary

On Tue, 18 May 2021 at 16:22, Matthias Pohl  wrote:

> Sorry, for not getting back earlier. I missed that thread. It looks like
> some wrong assumption on our end. Hence, Yangze and Guowei are right. I'm
> gonna look into the issue.
>
> Matthias
>
> On Fri, May 14, 2021 at 4:21 AM Guowei Ma  wrote:
>
>> Hi, Gary
>>
>> I think it might be a bug. So would you like to open a jira for this.
>> And could you share the exception ,which the TaskManagerLocation is null?
>> It might be very helpful to verify the cause.
>>
>> Best,
>> Guowei
>>
>>
>> On Thu, May 13, 2021 at 10:36 AM Yangze Guo  wrote:
>>
>>> Hi, it seems to be related to FLINK-22276. Thus, I'd involve Matthias
>>> to take a look.
>>>
>>> @Matthias My gut feeling is that not all execution who has failureInfo
>>> has been deployed?
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Wed, May 12, 2021 at 10:12 PM Gary Wu  wrote:
>>> >
>>> > Hi,
>>> >
>>> > We have upgraded our Flink applications to 1.13.0 but we found that
>>> Root Exception can not be shown on Web UI with an internal server error
>>> message. After opening browser development console and trace the message,
>>> we found that there is a exception in jobmanager:
>>> >
>>> > 2021-05-12 13:30:45,589 ERROR
>>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler [] -
>>> Unhandled exception.
>>> > java.lang.IllegalArgumentException: The location must not be null for
>>> a non-global failure.
>>> > at
>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> > at
>>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.assertLocalExceptionInfo(JobExceptionsHandler.java:218)
>>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> > at
>>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createRootExceptionInfo(JobExceptionsHandler.java:191)
>>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> > at
>>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
>>> ~[?:?]
>>> > at java.util.stream.SliceOps$1$1.accept(SliceOps.java:199) ~[?:?]
>>> > at
>>> java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1632)
>>> ~[?:?]
>>> > at
>>> java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
>>> ~[?:?]
>>> > at
>>> java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
>>> ~[?:?]
>>> > at
>>> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488) ~[?:?]
>>> > at
>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
>>> ~[?:?]
>>> > at
>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
>>> ~[?:?]
>>> > at
>>> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
>>> > at
>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
>>> ~[?:?]
>>> > at
>>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionHistory(JobExceptionsHandler.java:169)
>>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> > at
>>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.createJobExceptionsInfo(JobExceptionsHandler.java:154)
>>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> > at
>>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:101)
>>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> > at
>>> org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler.handleRequest(JobExceptionsHandler.java:63)
>>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> > at
>>> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$0(AbstractExecutionGraphHandler.java:87)
>>> ~[flink-dist_2.12-1.13.0.jar:1.13.0]
>>> > at
>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
>>> [?:?]
>>> > at
>>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>>> [?:?]
>>> > at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>> [?:?]
>>> > at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
>>> > at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>>> [?:?]
>>> > at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> [?:?]
>>> > at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> [?:?]
>>> > at java.lang.Thread.run(Thread.java:834) [?:?]
>>> >
>>> > We would like to check Is there any configuration change should be
>>> done for the application? Thanks!
>>> >
>>> > Regards,
>>> > -Gary
>>> >
>>> >
>>> >
>>> > APPIER EMAIL NOTICE
>>> >
>>> > The contents of this email message and any attachments from Appier
>>> Group Inc.

Re: Re: Handling "Global" Updating State

2021-05-18 Thread Yun Gao
Hi Rion,

Sorry for the late reply, another simpler method might indeed be in 
initializeState,
the operator directly read the data from the kafka to initialize the state.

Best,
Yun



 --Original Mail --
Sender:Rion Williams 
Send Date:Mon May 17 19:53:35 2021
Recipients:Yun Gao 
CC:user 
Subject:Re: Handling "Global" Updating State
Hi Yun,

That’s very helpful and good to know that the problem/use-case has been thought 
about. Since my need is probably shorter-term than later, I’ll likely need to 
explore a workaround.

Do you know of an approach that might not require the use of check pointing and 
restarting? I was looking into exploring initializeState within my 
broadcast-side stream to get it current and then simply listening to the Kafka 
topic as records come in. I’d imagine this would work, but that may be a bit of 
a naive approach.

Thanks!

Rion 

On May 17, 2021, at 1:36 AM, Yun Gao  wrote:



Hi Rion, 

I think FLIP-150[1] should be able to solve this scenario.

Since FLIP-150 is still under discussion, for now a temporary method come 
to me might be
1. Write a first job to read the kafka and update the broadcast state of some 
operator. The job
would keep the source alive after all the data are emit (like sleep forever), 
and when all the data 
are processed, then stop the job with savepoint. 
2. Use the savepoint to start the original job. For the operator required the 
broadcast state, it could
set the same uid and same state name with the corresponding operator in the 
first job, so it could
acqure the state content on startup.

Yun,
Best

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-150%3A+Introduce+Hybrid+Source


 --Original Mail --
Sender:Rion Williams 
Send Date:Mon May 17 07:00:03 2021
Recipients:user 
Subject:Re: Handling "Global" Updating State
Hey folks,

After digging into this a bit it does seem like Broadcast State would fit the 
bill for this scenario and keeping the downstream operators up-to-date as 
messages arrived in my Kafka topic.

My question is - is there a pattern for pre-populating the state initially? In 
my case, I need to have loaded all of my “lookup” topic into state before 
processing any records in the other stream.

My thought initially is to do something like this, if it’s possible:

- Create a KafkaConsumer on startup to read the lookup topic in its entirety 
into some collection like a hashmap (prior to executing the Flink pipeline to 
ensure synchronicity)
- Use this to initialize the state of my broadcast stream (if possible)
- At this point that stream would be broadcasting any new records coming in, so 
I “should” stay up to date at that point.

Is this an oversimplification or is there an obviously better / well known 
approach to handling this?

Thanks,

Rion

On May 14, 2021, at 9:51 AM, Rion Williams  wrote:



Hi all,

I've encountered a challenge within a Flink job that I'm currently working on. 
The gist of it is that I have a job that listens to a series of events from a 
Kafka topic and eventually sinks those down into Postgres via the JDBCSink.

A requirement recently came up for the need to filter these events based on 
some configurations that are currently being stored within another Kafka topic. 
I'm wondering what the best approach might be to handle this type of problem.

My initial naive approach was:

When Flink starts up, use a regular Kafka Consumer and read all of the 
configuration data from that topic in its entirety.
Store the messages from that topic in some type of thread-safe collection 
statically accessible by the operators downstream.
Expose the thread-safe collection within the operators to actually perform the 
filtering.
This doesn't seem right though. I was reading about BroadcastState which seems 
like it might fit the bill (e.g. keep those mappings in Broadcast state so that 
all of the downstream operations would have access to them, which I'd imagine 
would handle keeping things up to date). 

Does Flink have a good pattern / construct to handle this? Basically, I have a 
series of mappings that I want to keep relatively up to date in a Kafka topic, 
and I'm consuming from another Kafka topic that will need those mappings to 
filter against.

I'd be happy to share some of the approaches I currently have or elaborate a 
bit more if that isn't super clear.

Thanks much,

Rion



Re: DataStream API Batch Execution Mode restarting...

2021-05-18 Thread Yun Gao
Hi Marco,

Have you configured the restart strategy ? if the restart-strategy [1] is 
configuration
into some strategies other than none, Flink should be able to restart the job 
automatically
on failover. The restart strategy could also be configuration via 
StreamExecutionEnvironment#setRestartStrategy. 

If no restart strategy is configured (the default behavior), the job would 
failed and we would
need to re-submit the job to execute it from the scratch.

Best,
Yun




 --Original Mail --
Sender:Marco Villalobos 
Send Date:Wed May 19 11:27:37 2021
Recipients:user 
Subject:DataStream API Batch Execution Mode restarting...

I have a DataStream running in Batch Execution mode within YARN on EMR.
My job failed an hour into the job two times in a row because the task manager 
heartbeat timed out.

Can somebody point me out how to restart a job in this situation? I can't find 
that section of the documentation.

thank you.

Questions Flink DataStream in BATCH execution mode scalability advice

2021-05-18 Thread Marco Villalobos
Questions Flink DataStream in BATCH execution mode scalability advice.

Here is the problem that I am trying to solve.

Input is an S3 bucket directory with about 500 GB of data across many
files. The instance that I am running on only has 50GB of EBS storage. The
nature of this data is time series data. Imagine name, value, timestamp.

I must average the time_series.value by time_series.name on a tumbling
window of 15 minutes. Upon aggregation, the time_series.timestamp gets
rounded up a quarter.  I key by tag name and 15-minute interval.

After aggregation, I must forward fill the missing quarters for each
time_series.name. Currently, this forward fill operator is keyed only by
time_series.name. Does this mean that in batch mode, all of the time series
with the same time_series.name within the 500 gb of files must fit in
memory?

The results are saved in a rdbms.

If this job somehow reads all 500 GB before it sends it to the first
operator, where is the data store?

Now considering that the EMR node only has 50GB of ebs (that's disk
storage), is there a means to configure Flink to store its intermediate
results within S3?

When the job failed, I saw this exception in the log: "Recovery is
suppressed by NoRestartBackoffTimeStrategy." Is there a way to configure
this to recover?

My job keeps on failing for the same reason, it says, "The heartbeat of
TaskManager with id container_xxx timed out." Is there a way to configure
it not to timeout?

I would appreciate any advice on how I should save these problems. Thank
you.


Re: DataStream API Batch Execution Mode restarting...

2021-05-18 Thread Marco Villalobos
Thank you.  I used the default restart strategy.  I'll change that.

On Tue, May 18, 2021 at 11:02 PM Yun Gao  wrote:

> Hi Marco,
>
> Have you configured the restart strategy ? if the restart-strategy [1] is
> configuration
> into some strategies other than none, Flink should be able to restart the
> job automatically
> on failover. The restart strategy could also be configuration via
> StreamExecutionEnvironment#setRestartStrategy.
>
> If no restart strategy is configured (the default behavior), the job would
> failed and we would
> need to re-submit the job to execute it from the scratch.
>
> Best,
> Yun
>
>
>
> --Original Mail --
> *Sender:*Marco Villalobos 
> *Send Date:*Wed May 19 11:27:37 2021
> *Recipients:*user 
> *Subject:*DataStream API Batch Execution Mode restarting...
>
>> I have a DataStream running in Batch Execution mode within YARN on EMR.
>> My job failed an hour into the job two times in a row because the task
>> manager heartbeat timed out.
>>
>> Can somebody point me out how to restart a job in this situation? I can't
>> find that section of the documentation.
>>
>> thank you.
>>
>


Re: DataStream Batch Execution Mode and large files.

2021-05-18 Thread Yun Gao
Hi Marco,

With BATCH mode, all the ALL_TO_ALL edges would be marked as blocking
and would use intermediate file to transfer data. Flink now support hash 
shuffle 
and sort shuffle for blocking edges[1], both of them stores the intermediate 
files in
the directories configured by io.tmp.dirs[2].


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/batch/blocking_shuffle/
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#io-tmp-dirs
 --Original Mail --
Sender:Marco Villalobos 
Send Date:Wed May 19 09:50:45 2021
Recipients:user 
Subject:DataStream Batch Execution Mode and large files.

Hi,

I am using the DataStream API in Batch Execution Mode, and my "source" is an s3 
Buckets with about 500 GB of data spread across many files.

Where does Flink stored the results of processed / produced data between tasks?

There is no way that 500GB will fit in memory.  So I am very curious how that 
happens.

Can somebody please explain?

Thank you.

Marco A. Villalobos

flink all events getting dropped as late

2021-05-18 Thread Debraj Manna
Crossposting from stackoverflow


My flink pipeline looks like below

WatermarkStrategy watermarkStrategy = WatermarkStrategy

.forBoundedOutOfOrderness(Duration.ofSeconds(900))
.withTimestampAssigner((metric, timestamp) -> {
logger.info("ETS: mts: {}, ts: {}",
metric.metricPoint.timeInstant, timestamp);
return metric.metricPoint.timeInstant;
});

metricStream = kafkasource
.process(<>)
.assignTimestampsAndWatermarks(watermarkStrategy)
.transform("debugFilter",
TypeInformation.of(<>), new StreamWatermarkDebugFilter<>("Op"))
.filter(<>)
.map(<>)
.flatMap(<>)
.keyBy(<>)
.window(TumblingEventTimeWindows.of(Time.seconds(300)))
.allowedLateneess(900)
.sideOutputLateData(lateOutputTag)
.aggregate(AggregateFunction, ProcessWindowFunction)
.addSink()

I am running with parallelism 1 and default setAutowatermarkInterval of 200
ms. I did not set setStreamTimeCharacteristic as from flink 1.12 by default
it is event time.

I am seeing that watermark is progressing from the output of
StreamWatermarkDebugFilter

but
all the events are getting marked as late and is getting gathered at
lateOutputTag.

2021-05-18 17:14:19,745 INFO  - ETS: mts:
162131010, ts: 1621310582271
2021-05-18 17:14:19,745 INFO  - ETS: mts:
162131010, ts: 1621310582271
2021-05-18 17:14:19,842 INFO  StreamWatermarkDebugFilter - Op,
Watermark: 162130949
2021-05-18 17:14:19,944 INFO  - ETS: mts:
162130980, ts: 1621310582275
2021-05-18 17:14:19,944 INFO  - ETS: mts:
162130980, ts: 1621310582275
...
2021-05-18 17:14:20,107 INFO  - ETS: mts:
162131038, ts: 1621310582278
2021-05-18 17:14:20,107 INFO  - ETS: mts:
162131038, ts: 1621310582278
2021-05-18 17:14:20,137 INFO  StreamWatermarkDebugFilter - Op,
Watermark: 162130977
2021-05-18 17:14:20,203 INFO  - ETS: mts:
162130980, ts: 1621310582279
...
2021-05-18 17:17:47,839 INFO  - ETS: mts:
162131010, ts: 1621310681159
2021-05-18 17:17:47,848 INFO  StreamWatermarkDebugFilter - Op,
Watermark: 162131009
2021-05-18 17:17:47,958 INFO  - ETS: mts:
162130980, ts: 1621310681237
2021-05-18 17:17:47,958 INFO  - ETS: mts:
162130980, ts: 1621310681237
...
2021-05-18 17:22:24,207 INFO  - ETS: mts:
162131010, ts: 1621310703622
2021-05-18 17:22:24,229 INFO  StreamWatermarkDebugFilter - Op,
Watermark: 162131039
2021-05-18 17:22:24,315 INFO  - ETS: mts:
162130980, ts: 1621310705177
2021-05-18 17:22:24,315 INFO  - ETS: mts:
162130980, ts: 1621310705177

I have seen this discussion

and
it is not an idleness problem.

It looks like related to this discussion
.
Can someone suggest how can I debug this problem further?


Re: DataStream Batch Execution Mode and large files.

2021-05-18 Thread Marco Villalobos
Thank you very much. You've been very helpful.

Since my intermediate results are large, I suspect that io.tmp.dirs must
literally be on the local file system. Thus, since I use EMR, I'll need to
configure EBS to support more data.

On Tue, May 18, 2021 at 11:08 PM Yun Gao  wrote:

> Hi Marco,
>
> With BATCH mode, all the ALL_TO_ALL edges would be marked as blocking
> and would use intermediate file to transfer data. Flink now support hash
> shuffle
> and sort shuffle for blocking edges[1], both of them stores the
> intermediate files in
> the directories configured by io.tmp.dirs[2].
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/batch/blocking_shuffle/
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#io-tmp-dirs
>
> --Original Mail --
> *Sender:*Marco Villalobos 
> *Send Date:*Wed May 19 09:50:45 2021
> *Recipients:*user 
> *Subject:*DataStream Batch Execution Mode and large files.
>
>> Hi,
>>
>> I am using the DataStream API in Batch Execution Mode, and my "source" is
>> an s3 Buckets with about 500 GB of data spread across many files.
>>
>> Where does Flink stored the results of processed / produced data between
>> tasks?
>>
>> There is no way that 500GB will fit in memory.  So I am very curious how
>> that happens.
>>
>> Can somebody please explain?
>>
>> Thank you.
>>
>> Marco A. Villalobos
>>
>