Re: Apache Flink - Continuously monitoring directory using filesystem connector - 1.14.3

2022-02-17 Thread Francesco Guardiani
Hi,
Filesystem source directory watching is going to be available from 1.15:
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#directory-watching

FG

On Fri, Feb 18, 2022 at 1:28 AM M Singh  wrote:

> Hi:
>
> I have a simple application and am using file system connector to monitor
> a directory and then print to the console (using datastream).  However, the
> application stops after reading the file in the directory (at the moment I
> have a single file in the directory).   I am using Apache Flink version
> 1.14.3.
>
>  believe there is a configuration option to be used in the 'with' clause
> but I could not find the right config - I tried 'streaming-source.enable'
> = 'true' but that results in exception.
>
> I have also tried using EnvironmentSettings in streaming mode (as shown
> below) but still the application stops after reading the file in the
> directory.
>
> Here is the code segment:
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>
> public class TestApplication {
>
> public static void main(String [] args) throws Exception {
> StreamExecutionEnvironment see =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(see,
> settings);
>
> tEnv.executeSql(
> "  CREATE TEMPORARY TABLE events (" +
> "  `event_id` STRING" +
> ")" +
> "WITH (" +
> "  'connector' = 'filesystem'," +
> "  'path' = './src/main/resources/events/'," +
> "  'format' = 'json'" +
> ")"
> );
>
> Table events = tEnv.sqlQuery(
> "SELECT * from events"
> );
> tEnv.toDataStream(events).print("events");
>
> see.execute();
> }
> }
>
> Here is the console output:
>
> events:7> +I[8b8fabde-45f5-4e94-b6af-7cd1396a11e9]
>
> Process finished with exit code 0
>
>
> Thanks
>


Re: Pyflink with pulsar

2022-02-17 Thread Luning Wong
The Pulsar python source connector will be released in 1.15 version.
if you want to use it right now, you could compile the master branch.
When I completed the python connector code, I only tested the native
pulsar protocol without KOP.

Usage examples are in comments of the PulsarSource class and in test
case of the FlinkPulsarTest class.
You can find them in the following PR link.
https://github.com/apache/flink/pull/18388

Best regards,

Wong

Xingbo Huang  于2022年2月18日周五 14:32写道:
>
>
>
> -- Forwarded message -
> 发件人: Ananth Gundabattula 
> Date: 2022年2月17日周四 16:57
> Subject: Pyflink with pulsar
> To: user@flink.apache.org 
>
>
> Hello All,
>
> I am trying to build a pyflink application and I currently have a pulsar 
> instance that I need to connect and start streaming messages from.
>
> I was wondering if there is any advice regarding pulsar as a source connector 
> available via python ?
>
> Alternately, Pulsar seems to have a kafka protocol handler (KOP) and was 
> wondering if anyone has built a pyflink application streaming from pulsar 
> using the Kafka protocol ( By using pyflink kafka consumer ) ? If yes, could 
> you please share your experiences.
>
>
> Regards,
> Ananth
>
>


Re: Flink 1.15 deduplication view and lookup join

2022-02-17 Thread Yun Gao

Hi Francis,

I think requiring primary for versioned table[1] used in temporarl join[2] 
should be
expected. May I have a double confirmation that which table serves as the 
versioned
table in this case? Is it the streaming table from the rabbitmq or the joined 
data?

Best,
Yun



[1]  
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/versioned_tables/
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins




 --Original Mail --
Sender:Francis Conroy 
Send Date:Thu Feb 17 11:27:01 2022
Recipients:user 
Subject:Flink 1.15 deduplication view and lookup join

Hi user group, 

I'm using flink 1.15 currently (we're waiting for it to be released) to build 
up some streaming pipelines and I'm trying to do a temporal lookup join.

I've got several tables(all with primary keys) defined which are populated by 
Debezium CDC data, let's call them a, b and c.

I've defined a view which joins all three tables to give some hierarchical 
association data rows like in the diagram.

This all works fine so far.
I'm trying to join this table with a table from a datastream, using a lookup 
join 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join)
 like follows:

I've added a time field to both tables now and I'm getting the following 
validation exception:
Temporal Table Join requires primary key in versioned table, but no primary key 
can be found.

 I went and implemented another view on the joined data which implemented the 
deduplication query 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/deduplication/#deduplication)
 

Here is my view definition: 
CREATE VIEW versioned_endpoint_association AS
SELECT device_id,
   leg_dt_id,
   ldt_id,
   ep_uuid,
   unit_uuid,
   pf_uuid,
   update_time
FROM (
SELECT *,
   ROW_NUMBER() OVER (PARTITION BY device_id
   ORDER BY update_time DESC) as rownum
  FROM endpoint_association)
WHERE rownum = 1;
After taking all steps I cannot get the temporal join to work, am I missing 
some detail which will tell flink that versioned_endpoint_association should 
in-fact be interpreted as a versioned table?

Looking at the log it's important that there is a LogicalRank node which can 
convert to a Deduplicate node, but the conversion isn't happening. 




This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email and 
delete it from your system. You may not use, disseminate, distribute or copy 
this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia

Re: java.io.IOException: Failed to deserialize consumer record due to/ How to serialize table output for KafkaSink

2022-02-17 Thread Yun Gao
Hi, 

I tried with a simplied version of the attached code, and it shows the
detailed exception is 

Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to 
java.time.Instant
 at org$apache$flink$api$java$tuple$Tuple4$1$Converter.toInternal(Unknown 
Source)
 at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:96)
 at 
org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:46)
 at 
org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
 at 
org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.toInternal(DataStructureConverterWrapper.java:51)
 at 
org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:92)
 ... 14 more

Thus it looks to me that the error is caused by the column f0 is of the type 
Long and could not be transformed into the 
TIMESTAMP_LTZ type directly. To fix this error, one possible solution might be 
change the declaration of tupled4DsTable as

Table tupled4DsTable =
tableEnv.fromDataStream(
tuple4ds,
Schema.newBuilder()
.columnByExpression(
"f0_ts",

Expressions.callSql("TO_TIMESTAMP_LTZ(f0, 3)"))
.column("f1", "BIGINT")
.column("f2", "STRING")
.column("f3", "STRING")
.watermark("f0_ts", "SOURCE_WATERMARK()")
.build())
.as("eventTime", "handlingTime", "transactionId", 
"originalEvent");
​Sorry I'm not an expert in Table / SQL and might miss or overlook something.


Best,
Yun


 --Original Mail --
Sender:HG 
Send Date:Thu Feb 17 22:07:07 2022
Recipients:user 
Subject:java.io.IOException: Failed to deserialize consumer record due to/ How 
to serialize table output for KafkaSink

Hello,

I have to convert the table to Datastream and try to do it with toAppendStream 
(just saw that it is deprecated ) 
But I have not been able to do the conversion as yet. (See the attached code).
Also my final Sink should be Kafka and the format ObjectNode/JSON.
So I need a serializer eventually.

What am I doing wrong? Can I convert to an ObjectNode with a serializer 
directly?
Both toAppendStream and toDataStream fail with the same error.

It fails with (shortened stack trace)
java.io.IOException: Failed to deserialize consumer record due to
at 
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:54)
 ~[?:?]

Caused by: java.io.IOException: Failed to deserialize consumer record 
ConsumerRecord(topic = cag-verwerkingstijden-input, partition = 18, leaderEpoch 
= 0, offset = 27070, CreateTime = 1645105052689, serialized key size = -1, 
serialized value size = 3587, headers = RecordHeaders(headers = [], isReadOnly 
= false), key = null, value = [B@7fcb6863).
...
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
...
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
 ~[flink-dist_2.12-1.14.2.jar:1.14.2]
..
Caused by: org.apache.flink.util.FlinkRuntimeException: Error during input 
conversion from external DataStream API to internal Table API data structures. 
Make sure that the provided data types that configure the converters are 
correctly declared in the schema. Affected record:
Table result = tableEnv.sqlQuery("select transactionId" +
", originalEvent" +
", handlingTime" +
", handlingTime - ifnull(lag(handlingTime) over (partition by 
transactionId order by eventTime), handlingTime) as elapsedTime" +
" from " + tupled4DsTable + " order by eventTime");

result.printSchema();

TupleTypeInfo> tupleType = new 
TupleTypeInfo<>(Types.STRING(), Types.STRING(), Types.LONG(), Types.LONG()); 
<-- deprecated and fails
DataStream> dsRow = 
tableEnv.toAppendStream(result, tupleType); 
<-- deprecated and fails
DataStream xx = tableEnv.toDataStream(result); <-- fails with the same 
errorRegards Hans

Re: Synchronization across tasks using checkpoint barriers

2022-02-17 Thread Gopi Krishna M
Just found that we can use prepareSnapshotPreBarrier

in AbstractStreamOperator to achieve this.

On Tue, Feb 15, 2022 at 9:13 AM Gopi Krishna M 
wrote:

> CheckpointedFunction docs mention the following -
>
>> The snapshotState(FunctionSnapshotContext)
>> 
>> is called whenever a checkpoint takes a state snapshot of the
>> transformation function. Inside this method, functions typically make sure
>> that the checkpointed data structures (obtained in the initialization
>> phase) are up to date for a snapshot to be taken. The given snapshot
>> context gives access to the metadata of the checkpoint.
>>
>> *In addition, functions can use this method as a hook to
>> flush/commit/synchronize with external systems. *
>>
> Is there further documentation/examples of this synchronization?
>
> On Mon, Feb 14, 2022 at 10:50 PM Gopi Krishna M 
> wrote:
>
>>
>> On Mon, Feb 14, 2022 at 10:00 PM Niklas Semmler 
>> wrote:
>>
>>> So, you want to send basically the last message before the barrier?
>>>
>> Yes.
>>
>>
>>>
>>> Can you not instead send the first message after the barrier? From a
>>> first glance this sounds easier.
>>>
>> I'm not sure if this will help me synchronize the sink with the same
>> barrier.
>>
>>>
>>> Can you share what you are trying to accomplish?
>>>
>> Here's the objective I'm trying to achieve:
>> https://github.com/gopik/storage-reading-list/blob/main/RealtimeAnalytics.md#streaming-update-using-flink
>>
>> Basically, I want to capture DB changes via CDC and update a parquet
>> table (in delta format) consistently at each checkpoint. So, the data is
>> first partitioned by primary key, each task handling a set of keys causes
>> new files to be written, then when sink waits for barrier from all tasks
>> which will follow the file names. Then the sink updates the delta table via
>> a transaction and then consumes the barrier.
>>
>>
>>>
>>> Best regards,
>>> Niklas
>>>
>>> > On 14. Feb 2022, at 17:04, Gopi Krishna M 
>>> wrote:
>>> >
>>> > Thanks Niklas! This helps with synchronizing uploads across
>>> partitioned tasks. The next step is to pass the handle to this upload to
>>> the sink which should be part of the same checkpoint. Is it possible to do
>>> the following:
>>> >
>>> > 1. Keep reducing the events to keyedStore.
>>> > 2. On snapshotState: upload the events and get the handle. Generate
>>> this handle as the output for the sink to consume.
>>> > 3. Return from snapshotState.
>>> >
>>> > Basically I want to ensure that the handle output is received by the
>>> next stage before this checkpoint barrier.
>>> >
>>> > On Mon, Feb 14, 2022 at 8:11 PM Niklas Semmler 
>>> wrote:
>>> > Hi Gopi,
>>> >
>>> > You can implement CheckpointedFunction and use the method
>>> snapshotState(FunctionSnapshotContext) to upload state on each checkpoint.
>>> >
>>> >
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html
>>> >
>>> > Make sure, you don’t have unaligned checkpointing enabled.
>>> >
>>> > What it is:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#unaligned-checkpointing
>>> > How to configure:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints
>>> >
>>> > Note that delays introduced in CheckpointedFunction#snapshotState can
>>> slow down the job.
>>> >
>>> > You can also get the snapshot id from the FunctionSnapshotContext.
>>> Maybe that removes the need for the source logic?
>>> >
>>> > Does this help?
>>> >
>>> > Best regards,
>>> > Niklas
>>> >
>>> >
>>> > > On 14. Feb 2022, at 05:27, Gopi Krishna M 
>>> wrote:
>>> > >
>>> > > Hi,
>>> > > In my flink operators, I need to connect to an external service to
>>> update state. I was thinking that the updates to the external service can
>>> be synchronized via checkpoint barriers.
>>> > >
>>> > > The topology of the stream is a source, then a single stage of
>>> operator replicas handling different partitions, then all joining in a
>>> single sink.
>>> > >
>>> > > Each operator will contact the external service when it receives a
>>> checkpoint barrier and uploads local state (which caches the uploads and
>>> returns a handle).
>>> > >
>>> > > After upload, it forwards the cache handle to the sink. Once sink
>>> receives handles from all such operators, it calls the external service
>>> with a list of handles received. This helps ensure that all handles are
>>> from the same checkpoint barrier.
>>> > >
>>> > > Is it possible to achieve this in a flink applicati

Apache Flink - Continuously monitoring directory using filesystem connector - 1.14.3

2022-02-17 Thread M Singh
Hi:  
I have a simple application and am using file system connector to monitor a 
directory and then print to the console (using datastream).  However, the 
application stops after reading the file in the directory (at the moment I have 
a single file in the directory).   I am using Apache Flink version 1.14.3.
 believe there is a configuration option to be used in the 'with' clause but I 
could not find the right config - I tried 'streaming-source.enable' = 'true' 
but that results in exception.
I have also tried using EnvironmentSettings in streaming mode (as shown below) 
but still the application stops after reading the file in the directory.
Here is the code segment:
 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import 
org.apache.flink.table.api.EnvironmentSettings;import 
org.apache.flink.table.api.Table;import 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class TestApplication {
    public static void main(String [] args) throws Exception {        
StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();        
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();        
StreamTableEnvironment tEnv = StreamTableEnvironment.create(see, settings);
        tEnv.executeSql(                "  CREATE TEMPORARY TABLE events (" +   
                     "  `event_id` STRING" +                        ")" +       
                 "WITH (" +                        "  'connector' = 
'filesystem'," +                        "  'path' = 
'./src/main/resources/events/'," +                        "  'format' = 'json'" 
+                        ")"        );
        Table events = tEnv.sqlQuery(                "SELECT * from events"     
   );        tEnv.toDataStream(events).print("events");
        see.execute();    }}
Here is the console output:
events:7> +I[8b8fabde-45f5-4e94-b6af-7cd1396a11e9]
Process finished with exit code 0

Thanks

RE: Basic questions about resuming stateful Flink jobs

2022-02-17 Thread Schwalbe Matthias
Hi James,

Coming back to your original question on how to restart jobs from 
savepoints/checkpoints on LocalStreamEnvironment (the one used in a debugger):

Out of the box LocalStreamEnvironment does not allow setting a snapshot path to 
resume the job from.
The trick for me to do it anyway was to remodel the execute method and add a 
call to

jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(fromSavepoint,
 true))

(fromSavepoint being the savepointPath)

This is somewhat ugly but works (only ever used in debugger session, not in 
prod code).

The remodeled execute method look like this (for Flink 1.13.0, and should be 
similar for other releases): [1]


Feel free to get back with additional questions 😊

Thias

[1] remodeled execute(…) (scala):

  def execute(jobName: String): JobExecutionResult = {

if (fromSavepoint != null && 
env.streamEnv.getJavaEnv.isInstanceOf[LocalStreamEnvironment]) {
  // transform the streaming program into a JobGraph
  val locEnv = env.streamEnv.getJavaEnv.asInstanceOf[LocalStreamEnvironment]
  val streamGraph = locEnv.getStreamGraph
  streamGraph.setJobName(jobName)

  val jobGraph = streamGraph.getJobGraph()
  jobGraph.setAllowQueuedScheduling(true)

  
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(fromSavepoint,
 true))

  val configuration = new org.apache.flink.configuration.Configuration
  configuration.addAll(jobGraph.getJobConfiguration)
  configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0")

  // add (and override) the settings with what the user defined
  val cls = classOf[LocalStreamEnvironment]
  val cfgField = cls.getDeclaredField("configuration")
  cfgField.setAccessible(true)
  val cofg = 
cfgField.get(locEnv).asInstanceOf[org.apache.flink.configuration.Configuration]
  configuration.addAll(cofg)


  if (!configuration.contains(RestOptions.BIND_PORT)) 
configuration.setString(RestOptions.BIND_PORT, "0")

  val numSlotsPerTaskManager = 
configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
jobGraph.getMaximumParallelism)

  val cfg = new 
MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumSlotsPerTaskManager(numSlotsPerTaskManager).build

  val miniCluster = new MiniCluster(cfg)

  try {
miniCluster.start()
configuration.setInteger(RestOptions.PORT, 
miniCluster.getRestAddress.get.getPort)
return miniCluster.executeJobBlocking(jobGraph)
  } finally {
//transformations.clear
miniCluster.close()
  }

} else {
 throw new 
InvalidParameterException("flink.stream-environment.from-savepoint may only be 
used for local debug execution")
}
  }






From: Piotr Nowojski 
Sent: Donnerstag, 17. Februar 2022 09:23
To: Cristian Constantinescu 
Cc: Sandys-Lumsdaine, James ; James 
Sandys-Lumsdaine ; user@flink.apache.org
Subject: Re: Basic questions about resuming stateful Flink jobs

Hi James,

> Do I copy the checkpoint into a savepoint directory and treat it like a 
> savepoint?

You don't need to copy the checkpoint. Actually you can not do that, as 
checkpoints are not relocatable. But you can point to the checkpoint directory 
and resume from it like you would from a savepoint.

Regarding the testing, I would suggest taking a look at the docs [1] and 
MiniClusterWithClientResource in particular. If you are using it, you can 
access the cluster client (MiniClusterWithClientResource#getClusterClient) and 
this client should be an equivalent of the CLI/Rest API. You can also use it to 
recover from savepoints - check for `setSavepointRestoreSettings` usage in [2].

But the real question would be why do you want to do it? You might not 
necessarily need to test for recovery at this level. From a user code 
perspective, it doesn't matter if you use checkpoint/savepoint, where it's 
stored. IMO what you want to do is to have:

1. Proper unit tests using TestHarness(es)

Again, take a look at [1]. You can setup unit tests, process some records, 
carefully control timers, then call 
`AbstractStreamOperatorTestHarness#snapshot` to take snapshot and 
`AbstractStreamOperatorTestHarness#initializeState` to test the recovery code 
path. For examples you can take a look at usages of those methods in the Flink 
code base. For example [3].

2. Later, I would recommend complementing such unit tests with some end-to-end 
tests, that would make sure everything is integrated properly, that your 
cluster is configured correctly etc. Then you don't need to use MiniCluster, as 
you can simply use Rest API/CLI. But crucially you don't need to be so thorough 
with covering all of the cases on this level, especially the failure handling, 
as you can rely more on the unit tests. Having said that, you might want to 
have a test that kills/restarts one TM on an end-to-end level.

Best,
Piotrek

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/

java.io.IOException: Failed to deserialize consumer record due to/ How to serialize table output for KafkaSink

2022-02-17 Thread HG
Hello,

I have to convert the table to Datastream and try to do it with
toAppendStream (just saw that it is deprecated )
But I have not been able to do the conversion as yet. (See the attached
code).
Also my final Sink should be Kafka and the format ObjectNode/JSON.
So I need a serializer eventually.

What am I doing wrong? Can I convert to an ObjectNode with a serializer
directly?
Both toAppendStream and toDataStream fail with the same error.

It fails with (shortened stack trace)
java.io.IOException: Failed to deserialize consumer record due to
at
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:54)
~[?:?]

Caused by: java.io.IOException: Failed to deserialize consumer record
ConsumerRecord(topic = cag-verwerkingstijden-input, partition = 18,
leaderEpoch = 0, offset = 27070, CreateTime = 1645105052689, serialized key
size = -1, serialized value size = 3587, headers = RecordHeaders(headers =
[], isReadOnly = false), key = null, value = [B@7fcb6863).
...
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
...
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
..
Caused by: org.apache.flink.util.FlinkRuntimeException: Error during input
conversion from external DataStream API to internal Table API data
structures. Make sure that the provided data types that configure the
converters are correctly declared in the schema. Affected record:

Table result = tableEnv.sqlQuery("select transactionId" +
", originalEvent" +
", handlingTime" +
", handlingTime - ifnull(lag(handlingTime) over (partition by
transactionId order by eventTime), handlingTime) as elapsedTime" +
" from " + tupled4DsTable + " order by eventTime");

result.printSchema();


*TupleTypeInfo> tupleType = new
TupleTypeInfo<>(Types.STRING(), Types.STRING(), Types.LONG(),
Types.LONG()); <-- deprecated and failsDataStream> dsRow = tableEnv.toAppendStream(result,
tupleType)*; *<--
deprecated and fails*

*DataStream xx = tableEnv.toDataStream(result); <-- fails with
the same error*

Regards Hans


code.java
Description: Binary data


Re: Task manager errors with Flink ZooKeeper High Availability

2022-02-17 Thread Chesnay Schepler
You could reduce the log level of the PermanentBlobCache to WARN via the 
Log4j configuration.

I think you could even filter this specific message with Log4j.

On 17/02/2022 14:35, Koffman, Noa (Nokia - IL/Kfar Sava) wrote:


Thanks,

I understand that the functionality isn’t affected, this is very good 
news.


But is there a way to either skip this check or skip logging it? We 
see it in our log more the 400 times per task manager.


It would be very helpful if the log level could be reduced, or the 
check could be skipped? Is there any way to achieve this?


Thanks

Noa

*From: *Chesnay Schepler 
*Date: *Thursday, 17 February 2022 at 15:00
*To: *Koffman, Noa (Nokia - IL/Kfar Sava) , Yun 
Gao , user 

*Subject: *Re: Task manager errors with Flink ZooKeeper High Availability

Everything is fine.

The TM tries to retrieve the jar (aka, the blob), and there is a fast 
path to access it directly from storage. This fails (because it has no 
access to it), and then falls back to retrieving it from the JM.


On 17/02/2022 13:49, Koffman, Noa (Nokia - IL/Kfar Sava) wrote:

Hi,

Thanks for your reply,

Please see below the full stack trace, and the log message right
after, it looks like it is trying to download via BlobClient after
failing to download from store, as you have suggested.

My question is, is there a way to avoid this attempt to copy from
blob store? Is my configuration of task manager wrong?

Currently we are using the same flink-conf.yaml file for both job
manager and task managers, which include the high-availability
configuration mentioned below, should these be remove from the
task managers?

/2022-02-17 07:19:45,408 INFO
org.apache.flink.runtime.blob.PermanentBlobCache [] - Failed to
copy from blob store. Downloading from BLOB server instead./

/java.io.FileNotFoundException:

/flink_state/default/blob/job_0ddba6dd21053567981e11bda8da7c8e/blob_p-4ff16c9e641ba8803e55d62a6ab2f6d05512373e-92d38bfb5719024adf4c72b086184d76
(No such file or directory)/

/at java.io.FileInputStream.open0(Native Method) ~[?:?]/

/at java.io.FileInputStream.open(Unknown Source) ~[?:?]/

/at java.io.FileInputStream.(Unknown Source) ~[?:?]/

/at

org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
~[flink-dist_2.11-1.13.5.jar:1.13.5]/

/at

org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134)
~[flink-dist_2.11-1.13.5.jar:1.13.5]/

/at

org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:106)
~[flink-dist_2.11-1.13.5.jar:1.13.5]/

/at

org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:88)
~[flink-dist_2.11-1.13.5.jar:1.13.5]/

/at

org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:145)
[flink-dist_2.11-1.13.5.jar:1.13.5]/

/at

org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:187)
[flink-dist_2.11-1.13.5.jar:1.13.5]/

/at

org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.createUserCodeClassLoader(BlobLibraryCacheManager.java:251)
[flink-dist_2.11-1.13.5.jar:1.13.5]/

/at

org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:228)
[flink-dist_2.11-1.13.5.jar:1.13.5]/

/at

org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:199)
[flink-dist_2.11-1.13.5.jar:1.13.5]/

/at

org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:333)
[flink-dist_2.11-1.13.5.jar:1.13.5]/

/at

org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024)
[flink-dist_2.11-1.13.5.jar:1.13.5]/

/at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:628)
[flink-dist_2.11-1.13.5.jar:1.13.5]/

/at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
[flink-dist_2.11-1.13.5.jar:1.13.5]/

/at java.lang.Thread.run(Unknown Source) [?:?]/

/2022-02-17 07:19:45,408 INFO
org.apache.flink.runtime.blob.BlobClient  [] -
Downloading

0ddba6dd21053567981e11bda8da7c8e/p-4ff16c9e641ba8803e55d62a6ab2f6d05512373e-92d38bfb5719024adf4c72b086184d76
from

10-1-10-213.noa-edge-infra-flink-jobmanager.noa-edge.svc.cluster.local/10.1.10.213:6124/

//

Thanks

Noa

*From: *Yun Gao  
*Date: *Thursday, 17 February 2022 at 14:04
*To: *Koffman, Noa (Nokia - IL/Kfar Sava) 
, user 

*Subject: *Re: Task manager errors with Flink ZooKeeper High
Availability

Hi Koffman,

Fro

Re: Task manager errors with Flink ZooKeeper High Availability

2022-02-17 Thread Koffman, Noa (Nokia - IL/Kfar Sava)
Thanks,
I understand that the functionality isn’t affected, this is very good news.
But is there a way to either skip this check or skip logging it? We see it in 
our log more the 400 times per task manager.
It would be very helpful if the log level could be reduced, or the check could 
be skipped? Is there any way to achieve this?

Thanks
Noa

From: Chesnay Schepler 
Date: Thursday, 17 February 2022 at 15:00
To: Koffman, Noa (Nokia - IL/Kfar Sava) , Yun Gao 
, user 
Subject: Re: Task manager errors with Flink ZooKeeper High Availability
Everything is fine.

The TM tries to retrieve the jar (aka, the blob), and there is a fast path to 
access it directly from storage. This fails (because it has no access to it), 
and then falls back to retrieving it from the JM.

On 17/02/2022 13:49, Koffman, Noa (Nokia - IL/Kfar Sava) wrote:
Hi,
Thanks for your reply,
Please see below the full stack trace, and the log message right after, it 
looks like it is trying to download via BlobClient after failing to download 
from store, as you have suggested.
My question is, is there a way to avoid this attempt to copy from blob store? 
Is my configuration of task manager wrong?
Currently we are using the same flink-conf.yaml file for both job manager and 
task managers, which include the high-availability configuration mentioned 
below, should these be remove from the task managers?


2022-02-17 07:19:45,408 INFO  org.apache.flink.runtime.blob.PermanentBlobCache  
   [] - Failed to copy from blob store. Downloading from BLOB server 
instead.
java.io.FileNotFoundException: 
/flink_state/default/blob/job_0ddba6dd21053567981e11bda8da7c8e/blob_p-4ff16c9e641ba8803e55d62a6ab2f6d05512373e-92d38bfb5719024adf4c72b086184d76
 (No such file or directory)
at java.io.FileInputStream.open0(Native Method) ~[?:?]
at java.io.FileInputStream.open(Unknown Source) ~[?:?]
at java.io.FileInputStream.(Unknown Source) ~[?:?]
at 
org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) 
~[flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:106)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:88)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:145)
 [flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:187)
 [flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.createUserCodeClassLoader(BlobLibraryCacheManager.java:251)
 [flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:228)
 [flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:199)
 [flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:333)
 [flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024)
 [flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:628) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
at java.lang.Thread.run(Unknown Source) [?:?]
2022-02-17 07:19:45,408 INFO  org.apache.flink.runtime.blob.BlobClient  
   [] - Downloading 
0ddba6dd21053567981e11bda8da7c8e/p-4ff16c9e641ba8803e55d62a6ab2f6d05512373e-92d38bfb5719024adf4c72b086184d76
 from 
10-1-10-213.noa-edge-infra-flink-jobmanager.noa-edge.svc.cluster.local/10.1.10.213:6124


Thanks
Noa


From: Yun Gao 
Date: Thursday, 17 February 2022 at 14:04
To: Koffman, Noa (Nokia - IL/Kfar Sava) 
, user 

Subject: Re: Task manager errors with Flink ZooKeeper High Availability
Hi Koffman,

>From TM side the only possible usage come to me is that or components like 
>BlobCache, which is used to
transfer jars or large task informations between JM and TM. But specially for 
BlobService, if it failed to find
the file it would turn to JM via http connection. If convenient could you also 
post the stac

Re: Task manager errors with Flink ZooKeeper High Availability

2022-02-17 Thread Chesnay Schepler

Everything is fine.

The TM tries to retrieve the jar (aka, the blob), and there is a fast 
path to access it directly from storage. This fails (because it has no 
access to it), and then falls back to retrieving it from the JM.


On 17/02/2022 13:49, Koffman, Noa (Nokia - IL/Kfar Sava) wrote:


Hi,

Thanks for your reply,

Please see below the full stack trace, and the log message right 
after, it looks like it is trying to download via BlobClient after 
failing to download from store, as you have suggested.


My question is, is there a way to avoid this attempt to copy from blob 
store? Is my configuration of task manager wrong?


Currently we are using the same flink-conf.yaml file for both job 
manager and task managers, which include the high-availability 
configuration mentioned below, should these be remove from the task 
managers?


/2022-02-17 07:19:45,408 INFO 
org.apache.flink.runtime.blob.PermanentBlobCache [] - Failed to copy 
from blob store. Downloading from BLOB server instead./


/java.io.FileNotFoundException: 
/flink_state/default/blob/job_0ddba6dd21053567981e11bda8da7c8e/blob_p-4ff16c9e641ba8803e55d62a6ab2f6d05512373e-92d38bfb5719024adf4c72b086184d76 
(No such file or directory)/


/at java.io.FileInputStream.open0(Native Method) ~[?:?]/

/at java.io.FileInputStream.open(Unknown Source) ~[?:?]/

/at java.io.FileInputStream.(Unknown Source) ~[?:?]/

/at 
org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50) 
~[flink-dist_2.11-1.13.5.jar:1.13.5]/


/at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) 
~[flink-dist_2.11-1.13.5.jar:1.13.5]/


/at 
org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:106) 
~[flink-dist_2.11-1.13.5.jar:1.13.5]/


/at 
org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:88) 
~[flink-dist_2.11-1.13.5.jar:1.13.5]/


/at 
org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:145) 
[flink-dist_2.11-1.13.5.jar:1.13.5]/


/at 
org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:187) 
[flink-dist_2.11-1.13.5.jar:1.13.5]/


/at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.createUserCodeClassLoader(BlobLibraryCacheManager.java:251) 
[flink-dist_2.11-1.13.5.jar:1.13.5]/


/at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:228) 
[flink-dist_2.11-1.13.5.jar:1.13.5]/


/at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:199) 
[flink-dist_2.11-1.13.5.jar:1.13.5]/


/at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:333) 
[flink-dist_2.11-1.13.5.jar:1.13.5]/


/at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024) 
[flink-dist_2.11-1.13.5.jar:1.13.5]/


/at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:628) 
[flink-dist_2.11-1.13.5.jar:1.13.5]/


/at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
[flink-dist_2.11-1.13.5.jar:1.13.5]/


/at java.lang.Thread.run(Unknown Source) [?:?]/

/2022-02-17 07:19:45,408 INFO org.apache.flink.runtime.blob.BlobClient 
 [] - Downloading 
0ddba6dd21053567981e11bda8da7c8e/p-4ff16c9e641ba8803e55d62a6ab2f6d05512373e-92d38bfb5719024adf4c72b086184d76 
from 
10-1-10-213.noa-edge-infra-flink-jobmanager.noa-edge.svc.cluster.local/10.1.10.213:6124/


//

Thanks

Noa

*From: *Yun Gao 
*Date: *Thursday, 17 February 2022 at 14:04
*To: *Koffman, Noa (Nokia - IL/Kfar Sava) , 
user 

*Subject: *Re: Task manager errors with Flink ZooKeeper High Availability

Hi Koffman,

From TM side the only possible usage come to me is that or components 
like BlobCache, which is used to


transfer jars or large task informations between JM and TM. But 
specially for BlobService, if it failed to find


the file it would turn to JM via http connection. If convenient could 
you also post the stack of the exception


and may I have a double confirmation whether the job could still 
running normally with this exception?


Sorry that I might miss something~

Best,

Yun

--Original Mail --

*Sender:*Koffman, Noa (Nokia - IL/Kfar Sava) 

*Send Date:*Thu Feb 17 05:00:42 2022

*Recipients:*user 

*Subject:*Task manager errors with Flink ZooKeeper High Availability

Hi,

We are currently running flink in session deployment on k8s
cluster, with 1 job-manager and 3 task-managers

To support recovery from job-manager failure, following a
different mail thread,

We have enabled zookeeper high availability using a k8s
Persistent Volume

To achieve this, we’ve added these conf values:

/high-ava

Re: Task manager errors with Flink ZooKeeper High Availability

2022-02-17 Thread Koffman, Noa (Nokia - IL/Kfar Sava)
Hi,
Thanks for your reply,
Please see below the full stack trace, and the log message right after, it 
looks like it is trying to download via BlobClient after failing to download 
from store, as you have suggested.
My question is, is there a way to avoid this attempt to copy from blob store? 
Is my configuration of task manager wrong?
Currently we are using the same flink-conf.yaml file for both job manager and 
task managers, which include the high-availability configuration mentioned 
below, should these be remove from the task managers?


2022-02-17 07:19:45,408 INFO  org.apache.flink.runtime.blob.PermanentBlobCache  
   [] - Failed to copy from blob store. Downloading from BLOB server 
instead.
java.io.FileNotFoundException: 
/flink_state/default/blob/job_0ddba6dd21053567981e11bda8da7c8e/blob_p-4ff16c9e641ba8803e55d62a6ab2f6d05512373e-92d38bfb5719024adf4c72b086184d76
 (No such file or directory)
at java.io.FileInputStream.open0(Native Method) ~[?:?]
at java.io.FileInputStream.open(Unknown Source) ~[?:?]
at java.io.FileInputStream.(Unknown Source) ~[?:?]
at 
org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) 
~[flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:106)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:88)
 ~[flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:145)
 [flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:187)
 [flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.createUserCodeClassLoader(BlobLibraryCacheManager.java:251)
 [flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:228)
 [flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:199)
 [flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:333)
 [flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024)
 [flink-dist_2.11-1.13.5.jar:1.13.5]
at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:628) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
[flink-dist_2.11-1.13.5.jar:1.13.5]
at java.lang.Thread.run(Unknown Source) [?:?]
2022-02-17 07:19:45,408 INFO  org.apache.flink.runtime.blob.BlobClient  
   [] - Downloading 
0ddba6dd21053567981e11bda8da7c8e/p-4ff16c9e641ba8803e55d62a6ab2f6d05512373e-92d38bfb5719024adf4c72b086184d76
 from 
10-1-10-213.noa-edge-infra-flink-jobmanager.noa-edge.svc.cluster.local/10.1.10.213:6124


Thanks
Noa


From: Yun Gao 
Date: Thursday, 17 February 2022 at 14:04
To: Koffman, Noa (Nokia - IL/Kfar Sava) , user 

Subject: Re: Task manager errors with Flink ZooKeeper High Availability
Hi Koffman,

>From TM side the only possible usage come to me is that or components like 
>BlobCache, which is used to
transfer jars or large task informations between JM and TM. But specially for 
BlobService, if it failed to find
the file it would turn to JM via http connection. If convenient could you also 
post the stack of the exception
and may I have a double confirmation whether the job could still running 
normally with this exception?

Sorry that I might miss something~

Best,
Yun



--Original Mail --
Sender:Koffman, Noa (Nokia - IL/Kfar Sava) 
Send Date:Thu Feb 17 05:00:42 2022
Recipients:user 
Subject:Task manager errors with Flink ZooKeeper High Availability

Hi,
We are currently running flink in session deployment on k8s cluster, with 1 
job-manager and 3 task-managers
To support recovery from job-manager failure, following a different mail thread,
We have enabled zookeeper high availability using a k8s Persistent Volume

To achieve this, we’ve added these conf values:
high-availability: zookeeper
high-availability.zookeeper.quorum: zk-noa-edge-infra:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: /flink_state
high-availability.jobmanag

Re: Task manager errors with Flink ZooKeeper High Availability

2022-02-17 Thread Yun Gao
Hi Koffman,

From TM side the only possible usage come to me is that or components like 
BlobCache, which is used to
transfer jars or large task informations between JM and TM. But specially for 
BlobService, if it failed to find
the file it would turn to JM via http connection. If convenient could you also 
post the stack of the exception
and may I have a double confirmation whether the job could still running 
normally with this exception?

Sorry that I might miss something~

Best,
Yun




 --Original Mail --
Sender:Koffman, Noa (Nokia - IL/Kfar Sava) 
Send Date:Thu Feb 17 05:00:42 2022
Recipients:user 
Subject:Task manager errors with Flink ZooKeeper High Availability


Hi, 
We are currently running flink in session deployment on k8s cluster, with 1 
job-manager and 3 task-managers
To support recovery from job-manager failure, following a different mail thread,
We have enabled zookeeper high availability using a k8s Persistent Volume

To achieve this, we’ve added these conf values: 
high-availability: zookeeper
high-availability.zookeeper.quorum: zk-noa-edge-infra:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: /flink_state
high-availability.jobmanager.port: 6150
for the storageDir, we are using a k8s persistent volume with ReadWriteOnce

Recovery of job-manager failure is working now, but it looks like there are 
issues with the task-managers:
The same configuration file is used in the task-managers as well, and there are 
a lot of error in the task-manager’s logs –
java.io.FileNotFoundException: 
/flink_state/flink/blob/job_9f4be579c7ab79817e25ed56762b7623/blob_p-5cf39313e388d9120c235528672fd267105be0e0-938e4347a98aa6166dc2625926fdab56
 (No such file or directory)

It seems that the task-managers are trying to access the job-manager’s storage 
dir – can this be avoided?
The task manager does not have access to the job manager persistent volume – is 
this mandatory?
If we don’t have the option to use shared storage, is there a way to make 
zookeeper hold and manage the job states, instead of using the shared storage?

Thanks
Noa
 


Re: SQL / Table Api lag() over partition by ... and windowing

2022-02-17 Thread HG
Well I thought that in order to do the same with only the datastream api I
would need to use  MapPartitionFunction.



Op do 17 feb. 2022 om 10:41 schreef Francesco Guardiani <
france...@ververica.com>:

> Why do you need MapPartitionFunction?
>
> On Wed, Feb 16, 2022 at 7:02 PM HG  wrote:
>
>> Thanks
>>
>> Would the option for datastream be to write a MapPartitionFunction?
>>
>> Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani <
>> france...@ververica.com>:
>>
>>> > Which does not work since it cannot find lag function :-(
>>>
>>> lag and over are not supported at the moment with Table, so you need to
>>> use SQL for that.
>>>
>>> > *Will this obey the watermark strategy of the original Datastream?
>>> (see further below)*
>>>
>>> Yes. The code at the end of the mail is correct and should work fine. I
>>> have just one comment, if you're using this DataStream only to create the
>>> Table instance, you could also just define the watermark using the Schema
>>> builder itself, as described here:
>>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-
>>>
>>> On Wed, Feb 16, 2022 at 2:35 PM HG  wrote:
>>>
 Hello all

 I need to calculate the difference in time between ordered rows per
 transactionId. All events should arrive within the timeframe set by the
 out-of-orderness ( a couple of minutes). Events outside this time should be
 ignored.

 In SQL this would be :
 select transactionId  , handlingTime , *handlingTime -
 lag(handlingTime,1) over (partition by transactionId order by handlingTime)
 as elapsedTime*
 from table

 When I code :
 Table result = tableEnv.sqlQuery("select transactionId, handlingTime, 
 *handlingTime
 - if(null(lag(handlingTime) over (partition by transactionId order by
 handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")

 *Will this obey the watermark strategy of the original Datastream? (see
 further below)*
 I have also tried to use the Table Api with a session window like :
 Table t = tupled3DsTable
.window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as(
 "w")).groupBy($("transactionId"), $("w"))
.select($("handlingTime"), $("transactionId"), $("originalEvent"),
 $("handlingTime").max().over($("w")));
 This gives:
 org.apache.flink.client.program.ProgramInvocationException: The main
 method caused an error: Could not resolve over call.
 at
 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)

 and also :
 Table t = tupled3DsTable
 .window(Over.partitionby($("transactionId")).orderBy($(
 "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"),
 $("originalEvent"), $("handlingTime").lag().as("previousHandlingTime"
 ));
 Which does not work since it cannot find lag function :-(

 In java I have the following setup:
 WatermarkStrategy> wmstrategy =
 WatermarkStrategy
 .>>> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
 .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
 .withTimestampAssigner(new 
 SerializableTimestampAssigner>>> String, String>>() {
 @Override
 public long extractTimestamp(Tuple3
 element, long handlingTime) {
 return element.f0;
  }});

 DataStream> tuple3dswm = 
 tuple3ds.assignTimestampsAndWatermarks(wmstrategy);

 Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, 
 Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0",
  "SOURCE_WATERMARK()")
 .build()).as("handlingTime", "transactionId", "originalEvent");








Re: SQL / Table Api lag() over partition by ... and windowing

2022-02-17 Thread Francesco Guardiani
Why do you need MapPartitionFunction?

On Wed, Feb 16, 2022 at 7:02 PM HG  wrote:

> Thanks
>
> Would the option for datastream be to write a MapPartitionFunction?
>
> Op wo 16 feb. 2022 om 16:35 schreef Francesco Guardiani <
> france...@ververica.com>:
>
>> > Which does not work since it cannot find lag function :-(
>>
>> lag and over are not supported at the moment with Table, so you need to
>> use SQL for that.
>>
>> > *Will this obey the watermark strategy of the original Datastream?
>> (see further below)*
>>
>> Yes. The code at the end of the mail is correct and should work fine. I
>> have just one comment, if you're using this DataStream only to create the
>> Table instance, you could also just define the watermark using the Schema
>> builder itself, as described here:
>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/Schema.Builder.html#watermark-java.lang.String-org.apache.flink.table.expressions.Expression-
>>
>> On Wed, Feb 16, 2022 at 2:35 PM HG  wrote:
>>
>>> Hello all
>>>
>>> I need to calculate the difference in time between ordered rows per
>>> transactionId. All events should arrive within the timeframe set by the
>>> out-of-orderness ( a couple of minutes). Events outside this time should be
>>> ignored.
>>>
>>> In SQL this would be :
>>> select transactionId  , handlingTime , *handlingTime -
>>> lag(handlingTime,1) over (partition by transactionId order by handlingTime)
>>> as elapsedTime*
>>> from table
>>>
>>> When I code :
>>> Table result = tableEnv.sqlQuery("select transactionId, handlingTime, 
>>> *handlingTime
>>> - if(null(lag(handlingTime) over (partition by transactionId order by
>>> handlingTime),handlingTime) as elapsedTime* from tupled3DsTableVw")
>>>
>>> *Will this obey the watermark strategy of the original Datastream? (see
>>> further below)*
>>> I have also tried to use the Table Api with a session window like :
>>> Table t = tupled3DsTable
>>>.window(Session.withGap(lit(2).minutes()).on($("handlingTime")).as(
>>> "w")).groupBy($("transactionId"), $("w"))
>>>.select($("handlingTime"), $("transactionId"), $("originalEvent"), $(
>>> "handlingTime").max().over($("w")));
>>> This gives:
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Could not resolve over call.
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>>>
>>> and also :
>>> Table t = tupled3DsTable
>>> .window(Over.partitionby($("transactionId")).orderBy($(
>>> "handlingTime")).as("w")).select($("handlingTime"), $("transactionId"),
>>> $("originalEvent"), $("handlingTime").lag().as("previousHandlingTime"));
>>> Which does not work since it cannot find lag function :-(
>>>
>>> In java I have the following setup:
>>> WatermarkStrategy> wmstrategy =
>>> WatermarkStrategy
>>> .>> String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
>>> .withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
>>> .withTimestampAssigner(new 
>>> SerializableTimestampAssigner>> String, String>>() {
>>> @Override
>>> public long extractTimestamp(Tuple3
>>> element, long handlingTime) {
>>> return element.f0;
>>>  }});
>>>
>>> DataStream> tuple3dswm = 
>>> tuple3ds.assignTimestampsAndWatermarks(wmstrategy);
>>>
>>> Table tupled3DsTable = tableEnv.fromDataStream(tuple3ds, 
>>> Schema.newBuilder().column("f0","TIMESTAMP_LTZ(3)").column("f1","STRING").column("f2","STRING").watermark("f0",
>>>  "SOURCE_WATERMARK()")
>>> .build()).as("handlingTime", "transactionId", "originalEvent");
>>>
>>>
>>>
>>>
>>>
>>>


Re: Apache Flink - User Defined Functions - Exception when passing all arguments

2022-02-17 Thread Francesco Guardiani
Hi,

The SQL syntax is not supported, as the SQL standard itself does not allow
it. It sounds strange that it fails at validation phase rather than when
parsing, but it shouldn't work anyway.

I suggest you to just use Table API for that, as it's richer. You can even
use withColumns(range(..)) which gives you more control.

Hope it helps,
FG

On Thu, Feb 17, 2022 at 1:34 AM M Singh  wrote:

> Hi:
>
> I have a simple concatenate UDF (for testing purpose) defined as:
>
> public static class ConcatenateFunction extends ScalarFunction {
> public String eval(@DataTypeHint(inputGroup = InputGroup.ANY)
> Object ... inputs) {
> return Arrays.stream(inputs).map(i -> i.toString()).collect(
> Collectors.joining(","));
> }
> }
>
>
> and register it with the streaming table env:
>
> tEnv.createTemporarySystemFunction("concatenateFunction",
> ConcatenateFunction.class);
>
> However when I call the function as shown below - I get an exception
> indicating that the '*' is  an unknown identifier as shown below.
>
> Table concat = tEnv.sqlQuery(
> "SELECT  concatenateFunction(*) " +
> "FROM test_table"
> );
>
>
> I am printing the rows at the end of the test application.
>
> The exception is:
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. At line 1, column 29: Unknown identifier '*'
>
>
> The document (User-defined Functions
> )
> shows how to call the function with all args using scala/java :
>
> env.from("MyTable").select(call(MyConcatFunction.class, $("*")));
>
>
> But I could not find how to call the UDF using SQL syntax as shown above
> (select concatenateFunction(*) from test_table).
>
> Can you please let me know if there a way to pass all arguments to a UDF
> in SQL  ?
>
> Thanks
>


Pyflink with pulsar

2022-02-17 Thread Ananth Gundabattula
Hello All,

I am trying to build a pyflink application and I currently have a pulsar 
instance that I need to connect and start streaming messages from.

I was wondering if there is any advice regarding pulsar as a source connector 
available via python ?

Alternately, Pulsar seems to have a kafka protocol handler (KOP) and was 
wondering if anyone has built a pyflink application streaming from pulsar using 
the Kafka protocol ( By using pyflink kafka consumer ) ? If yes, could you 
please share your experiences.


Regards,
Ananth




Re: Basic questions about resuming stateful Flink jobs

2022-02-17 Thread Piotr Nowojski
Hi James,

> Do I copy the checkpoint into a savepoint directory and treat it like a
savepoint?

You don't need to copy the checkpoint. Actually you can not do that, as
checkpoints are not relocatable. But you can point to the checkpoint
directory and resume from it like you would from a savepoint.

Regarding the testing, I would suggest taking a look at the docs [1] and
MiniClusterWithClientResource in particular. If you are using it, you can
access the cluster client (MiniClusterWithClientResource#getClusterClient)
and this client should be an equivalent of the CLI/Rest API. You can also
use it to recover from savepoints - check for `setSavepointRestoreSettings`
usage in [2].

But the real question would be why do you want to do it? You might not
necessarily need to test for recovery at this level. From a user code
perspective, it doesn't matter if you use checkpoint/savepoint, where it's
stored. IMO what you want to do is to have:

1. Proper unit tests using TestHarness(es)

Again, take a look at [1]. You can setup unit tests, process some records,
carefully control timers, then
call `AbstractStreamOperatorTestHarness#snapshot` to take snapshot
and `AbstractStreamOperatorTestHarness#initializeState` to test the
recovery code path. For examples you can take a look at usages of those
methods in the Flink code base. For example [3].

2. Later, I would recommend complementing such unit tests with some
end-to-end tests, that would make sure everything is integrated properly,
that your cluster is configured correctly etc. Then you don't need to use
MiniCluster, as you can simply use Rest API/CLI. But crucially you don't
need to be so thorough with covering all of the cases on this level,
especially the failure handling, as you can rely more on the unit tests.
Having said that, you might want to have a test that kills/restarts one TM
on an end-to-end level.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/testing/
[2]
https://github.com/apache/flink/blob/cd8ea8d5b207569f68acc5a3c8db95cd2ca47ba6/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
[3]
https://github.com/apache/flink/blob/fdf40d2e0efe2eed77ca9633121691c8d1e744cb/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java

śr., 16 lut 2022 o 21:57 Cristian Constantinescu 
napisał(a):

> Hi James,
>
> I literally just went through what you're doing at my job. While I'm using
> Apache Beam and not the Flink api directly, the concepts still apply.
> TL;DR: it works as expected.
>
> What I did is I set up a kafka topic listener that always throws an
> exception if the last received message's timestamp is less than 5 minutes
> from when the processing happens (basically simulating a code fix after 5
> minutes). Then I let the pipeline execute the normal processing and I'd
> send a message on the exception topic.
>
> I have set up flink to retry twice, Beam offers a flag
> (numberOfExecutionRetries) [1] but it boils down to one of the Flink flags
> here [2]. What that does is that once Flink encounters an exception, say
> for example like my exception throwing topic, it will restore itself from
> the last checkpoint which includes kafka offsets and other things that
> transforms might have in there. Effectively this replays the messages after
> the checkpoint, and of course, my exception is thrown again when it tries
> to reprocess that message. After the second try, Flink will give up and the
> Flink job will stop (just like if you cancel it). If ran in an IDE, process
> will stop, if ran on a Flink cluster, the job will stop.
>
> When a Flink job stops, it usually clears up its checkpoints, unless you
> externalize them, for Beam it's the externalizedCheckpointsEnabled flag set
> to true. Check the docs to see what that maps to.
>
> Then, when you restart the flink job, just add the -s Flink flag followed
> by the latest checkpoint path. If you're running from an IDE, say IntelliJ,
> you can still pass the -s flag to Main method launcher.
>
> We use a bash script to restart or Flink jobs in our UAT/PROD boxes for
> now, you can use this command: find "$PATH_WHERE_YOU_SAVE_STATE" -name
> "_metadata" -print0 | xargs -r -0 ls -1 -t | head -1 to find the latest
> checkpoint in that path. And you know where PATH_WHERE_YOU_SAVE_STATE is,
> because you have to specify it when you initially start the flink job. For
> Beam, that's the stateBackendStoragePath flag. This is going to pick up the
> latest checkpoint before the pipeline stopped and will continue from it
> with your updated jar that handles the exception properly.
>
> Also note that I think you can set all these flags with Java code. In Beam
> it's just adding to the Main method args parameter or adding them to the
> PipelineOptions once you build that object from args. I've never used the
> Flink libs, just the runner, but from [1] and [3] it looks like you can
> confi