Re: NullPointerException in StateTable.put()

2021-08-17 Thread László Ciople
I removed that line from the code and it seems to have solved the problem.
Thank you very much! :)
All the best,
Laszlo

On Tue, Aug 17, 2021 at 9:54 AM László Ciople 
wrote:

> Ok, thank you for the tips. I will modify it and get back to you :)
>
> On Tue, Aug 17, 2021 at 9:42 AM David Morávek  wrote:
>
>> Hi Laszlo,
>>
>> Please use reply-all for mailing list replies. This may help others
>> finding their answer in the future ;)
>>
>>
>>> sb.append(DeviceDetail.class.getName()).append('@').append(Integer.toHexString(System.identityHashCode(this))).append('[');
>>
>>
>> This part will again make your key non-deterministic, because you're
>> using a memory address inside the content for hashing. I don't see any
>> other problem in the snippet you've sent.
>>
>> Best,
>> D.
>>
>> On Tue, Aug 17, 2021 at 8:33 AM László Ciople 
>> wrote:
>>
>>> I modified the code to use a sha256 hash instead of the hashCode when
>>> the id is not present in the object. The same behaviour was manifested
>>> still. Here is the code that selects the key:
>>> @Override
>>> public String getKey(AzureADIamEvent value) throws Exception {
>>> // key is the device id or the hash of the device properties
>>> String key = value.payload.properties.deviceDetail.deviceId;
>>>
>>> if (key == null || key.equals("")) {
>>> LOG.warn("Device id is null or empty, using sha256 value");
>>> key = DigestUtils.sha256Hex(value.payload.properties.
>>> deviceDetail.toString());
>>> }
>>>
>>> return key;
>>> }
>>>
>>> And the definition of the class the key is created from:
>>> public class DeviceDetail {
>>> @JsonProperty("browser")
>>> public String browser;
>>> @JsonProperty("deviceId")
>>> public String deviceId;
>>> @JsonProperty("displayName")
>>> public String displayName;
>>> @JsonProperty("operatingSystem")
>>> public String operatingSystem;
>>> @JsonProperty("trustType")
>>> public String trustType;
>>> @Override
>>> public String toString() {
>>> StringBuilder sb = new StringBuilder();
>>> sb.append(DeviceDetail.class.getName()).append('@').append(
>>> Integer.toHexString(System.identityHashCode(this))).append('[');
>>> sb.append("browser");
>>> sb.append('=');
>>> sb.append(((this.browser == null)?"":this.browser));
>>> sb.append(',');
>>> sb.append("deviceId");
>>> sb.append('=');
>>> sb.append(((this.deviceId == null)?"":this.deviceId));
>>> sb.append(',');
>>> sb.append("displayName");
>>> sb.append('=');
>>> sb.append(((this.displayName == null)?"":this.displayName
>>> ));
>>> sb.append(',');
>>> sb.append("operatingSystem");
>>> sb.append('=');
>>> sb.append(((this.operatingSystem == null)?"":this.
>>> operatingSystem));
>>> sb.append(',');
>>> sb.append("trustType");
>>> sb.append('=');
>>> sb.append(((this.trustType == null)?"":this.trustType));
>>> sb.append(',');
>>> if (sb.charAt((sb.length()- 1)) == ',') {
>>> sb.setCharAt((sb.length()- 1), ']');
>>> } else {
>>> sb.append(']');
>>> }
>>> return sb.toString();
>>> }
>>> }
>>>
>>>


Re: Problems with reading ORC files with S3 filesystem

2021-08-17 Thread Piotr Jagielski
Hi David,

Thanks for your answer. I finally managed to read ORC files by:
- switching to s3a:// in my Flink SQL table path parameter
- providing all the properties in Hadoop's core-site.xml file (fs.s3a.endpoint, 
fs.s3a.path.style.access, fs.s3a.aws.credentials.provider, fs.s3a.access.key, 
fs.s3a.secret.key)
- setting HADOOP_CONF_DIR env variable pointing to directory containing 
core-site.xml

Regards,
Piotr

On 2021/08/16 09:07:48, David Morávek  wrote: 
> Hi Piotr,
> 
> unfortunately this is a known long-standing issue [1]. The problem is that
> ORC format is not using Flink's filesystem abstraction for actual reading
> of the underlying file, so you have to adjust your Hadoop config
> accordingly. There is also a corresponding SO question [2] covering this.
> 
> I think a proper fix would actually require changing the interface on ORC
> side, because currently there seems to be now easy way to switch the FS
> implementation (I've just quickly checked OrcFile class, so this might not
> be 100% accurate).
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10989
> [2] https://stackoverflow.com/a/53435359
> 
> Best,
> D.
> 
> On Sat, Aug 14, 2021 at 11:40 AM Piotr Jagielski  wrote:
> 
> > Hi,
> > I want to use Flink SQL filesystem to read ORC file via S3 filesystem on
> > Flink 1.13. My table definition looks like this:
> >
> > create or replace table xxx
> >  (..., startdate string)
> >  partitioned by (startdate) with ('connector'='filesystem',
> > 'format'='orc', 'path'='s3://xxx/orc/yyy')
> >
> > I followed Flink's S3 guide and installed S3 libs as plugin. I have MinIO
> > as S3 provider and it works for Flinks checkpoints and HA files.
> > The SQL connector also works when I use CSV or Avro formats. The problems
> > start with ORC
> >
> > 1. If I just put flink-orc on job's classpath I get error on JobManager:
> > Caused by: java.lang.NoClassDefFoundError:
> > org/apache/hadoop/conf/Configuration
> > at
> > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:121)
> > ~[?:?]
> > at
> > org.apache.flink.orc.OrcFileFormatFactory$1.createRuntimeDecoder(OrcFileFormatFactory.java:88)
> > ~[?:?]
> > at
> > org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118)
> > ~[flink-table-blink_2.12-1.13.2.jar:1.13.2]
> >
> > 2. I managed to put hadoop common libs on the classpath by this maven
> > setup:
> >
> > 
> > org.apache.flink
> >
> > flink-orc_${scala.binary.version}
> > ${flink.version}
> > 
> > 
> > org.apache.orc
> > orc-core
> > 
> > 
> > 
> > 
> > org.apache.orc
> > orc-core
> > 1.5.6
> > 
> > 
> > org.apache.orc
> > orc-shims
> > 1.5.6
> > 
> > 
> > net.java.dev.jets3t
> > jets3t
> > 0.9.0
> > 
> >
> > No the job is accepted by JobManager, but execution fails with lack of AWS
> > credentials:
> > Caused by: java.lang.IllegalArgumentException: AWS Access Key ID and
> > Secret Access Key must be specified as the username or password
> > (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or
> > fs.s3.awsSecretAccessKey properties (respectively).
> > at
> > org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
> > at
> > org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:92)
> > at
> > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> > Method)
> > at
> > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> > Source)
> > at
> > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> > Source)
> > at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> > at
> > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> > at
> > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> > at com.sun.proxy.$Proxy76.initialize(Unknown Source)
> > at
> > org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:92)
> > at
> > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2433)
> > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:88)
> > at
> > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2467)
> > at org.apache.hadoop.fs.FileSystem$Cache.get(F

PyFlink StreamingFileSink bulk-encoded format (Avro)

2021-08-17 Thread Kamil ty
Hello,

I'm trying to save my data stream to an Avro file on HDFS. In Flink
documentation I can only see explanations for Java/Scala. However, I can't
seem to find a way to do it in PyFlink. Is this possible to do in PyFlink
currently?

Kind Regards
Kamil


Re: Upgrading from Flink on YARN 1.9 to 1.11

2021-08-17 Thread David Morávek
Hi Andreas,

the problem here is that the command you're using is starting a per-job
cluster (which is obvious from the used deployment method "
YarnClusterDescriptor.deployJobCluster"). Apparently the `-m yarn-cluster`
flag is deprecated and no longer supported, I think this is something we
should completely remove in the near future. Also this was always supposed
to start your job in per-job mode, but unfortunately in older versions this
was kind of simulated using session cluster, so I'd say it has just worked
by an accident (a.k.a "undocumented bug / feature").

What you really want to do is to start a session cluster upfront and than
use a `yarn-session` deployment target (where you need to provide yarn
application id so Flink can search for the active JobManager). This is well
documented in the yarn section of the docs

[1].

Can you please try this approach a let me know if that helped?

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode

Best,
D.

On Mon, Aug 16, 2021 at 8:52 PM Hailu, Andreas [Engineering] <
andreas.ha...@gs.com> wrote:

> Hi David,
>
>
>
> You’re correct about classpathing problems – thanks for your help in
> spotting them. I was able to get past that exception by removing some
> conflicting packages in my shaded JAR, but I’m seeing something else that’s
> interesting. With the 2 threads trying to submit jobs, one of the threads
> is able submit and process data successfully, while the other thread fails.
>
>
>
> Log snippet:
>
> 2021-08-16 13:43:12,893 [thread-1] INFO  YarnClusterDescriptor - Cluster
> specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-16 13:43:12,893 [thread-2] INFO  YarnClusterDescriptor - Cluster
> specification: ClusterSpecification{masterMemoryMB=4096,
> taskManagerMemoryMB=18432, slotsPerTaskManager=2}
>
> 2021-08-16 13:43:12,897 [thread-2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-16 13:43:12,897 [thread-1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-16 13:43:13,104 [thread-2] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-16 13:43:13,104 [thread-1] WARN  PluginConfig - The plugins
> directory [plugins] does not exist.
>
> 2021-08-16 13:43:20,475 [thread-1] INFO  YarnClusterDescriptor - Adding
> delegation token to the AM container.
>
> 2021-08-16 13:43:20,488 [thread-1] INFO  DFSClient - Created
> HDFS_DELEGATION_TOKEN token 56247060 for delp on ha-hdfs:d279536
>
> 2021-08-16 13:43:20,512 [thread-1] INFO  TokenCache - Got dt for
> hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536,
> Ident: (HDFS_DELEGATION_TOKEN token 56247060 for delp)
>
> 2021-08-16 13:43:20,513 [thread-1] INFO  Utils - Attempting to obtain
> Kerberos security token for HBase
>
> 2021-08-16 13:43:20,513 [thread-1] INFO  Utils - HBase is not available
> (not packaged with this application): ClassNotFoundException :
> "org.apache.hadoop.hbase.HBaseConfiguration".
>
> 2021-08-16 13:43:20,564 [thread-2] WARN  YarnClusterDescriptor - Add job
> graph to local resource fail.
>
> 2021-08-16 13:43:20,570 [thread-1] INFO  YarnClusterDescriptor -
> Submitting application master application_1628992879699_11275
>
> 2021-08-16 13:43:20,570 [thread-2] ERROR FlowDataBase - Exception running
> data flow for thread-2
>
> org.apache.flink.client.deployment.ClusterDeploymentException: Could not
> deploy Yarn job cluster.
>
> at
> org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
>
> at
> org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
>
> at
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
>
> at
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
>
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
>
> at
> com.gs.ep.da.lake.refinerlib.flink.ExecutionEnvironmentWrapper.execute(ExecutionEnvironmentWrapper.java:49)
>
> ...
>
> Caused by: java.io.IOException: Filesystem closed
>
> at
> org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
>
> at
> org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
>
> ...
>
> 2021-08-16 13:43:20,979 [thread-1] INFO  TimelineClientImpl - Timeline
> service address: http://d279536-003.dc.gs.com:8188/ws/v1/timeline/
>
> 2021-08-16 13:43:21,377 [thread-1] INFO  YarnClientImpl - Submitted
> application application_1628992879699_11275
>
> 2021-08-16 13:43:21,377 [thread-1] INFO  YarnClusterDescriptor - Waiting
> for the cluster to be allocated
>
> 2021-08-1

Re: Handling HTTP Requests for Keyed Streams

2021-08-17 Thread Rion Williams
Hi Caizhi,

I don’t mind the request being synchronous (or not using the Async I/O 
connectors). Assuming I go down that route would this be the appropriate way to 
handle this? Specifically creating an HttpClient and storing the result in 
state and on a keyed stream if the state was empty?

It makes sense to me, just wondering if there are any gotchas or 
recommendations in terms of a client that might support things like retries and 
if this a good pattern to accomplish this.

Thanks,

Rion

> On Aug 16, 2021, at 11:57 PM, Caizhi Weng  wrote:
> 
> 
> Hi!
> 
> As you mentioned that the configuration fetching is very infrequent, why 
> don't you use a blocking approach to send HTTP requests and receive 
> responses? This seems like a more reasonable solution to me.
> 
> Rion Williams  于2021年8月17日周二 上午4:00写道:
>> Hi all,
>> 
>> I've been exploring a few different options for storing tenant-specific 
>> configurations within Flink state based on the messages I have flowing 
>> through my job. Initially I had considered creating a source that would 
>> periodically poll an HTTP API and connect that stream to my original event 
>> stream.
>> 
>> However, I realized that this configuration information would basically 
>> never change and thus it doesn't quite make sense to poll so frequently. My 
>> next approach would be to have a function that would be keyed (by tenant) 
>> and storing the configuration for that tenant in state (and issue an HTTP 
>> call when I did not have it). Something like this:
>> 
>> class ConfigurationLookupFunction: KeyedProcessFunction> JsonObject>() {
>> // Tenant specific configuration
>> private lateinit var httpClient: HttpClient
>> private lateinit var configuration: ValueState
>> 
>> override fun open(parameters: Configuration) {
>> super.open(parameters)
>> httpClient = HttpClient.newHttpClient()
>> }
>> 
>> override fun processElement(message: JsonObject, context: Context, out: 
>> Collector) {
>> if (configuration.value() == null){
>> // Issue a request to the appropriate API to load the 
>> configuration
>> val url = 
>> HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build()
>> httpClient.send(..., {
>> // Store the configuration info within state here
>> configuration.update(...)
>> })
>> 
>> out.collect(message)
>> }
>> else {
>> // Get the configuration information and pass it downstream to 
>> be used by the sink
>> out.collect(message)
>> }
>> }
>> }
>> I didn't see any support for using the Async I/O functions from a keyed 
>> context, otherwise I'd imagine that would be ideal. The requests themselves 
>> should be very infrequent (initial call per tenant) and I'd imagine after 
>> that the necessary configuration could be pulled/stored within the state for 
>> that key.
>> 
>> Is there a good way of handling this that I might be overlooking with an 
>> existing Flink construct or function? I'd love to be able to leverage the 
>> Async I/O connectors as they seem pretty well thought out.
>> 
>> Thanks in advance!
>> 
>> Rion
>> 
>> 


Can we release new flink-connector-redis? Thanks!

2021-08-17 Thread Hongbo Miao
Hi Flink friends,

I recently have a question about how to set TTL to make Redis keys expire in 
flink-connector-redis.
I originally posted at Stack Overflow at 
https://stackoverflow.com/questions/68795044/how-to-set-ttl-to-make-redis-keys-expire-in-flink-connector-redis

Then I found there is a pull request added this feature about 2 years ago at 
https://github.com/apache/bahir-flink/pull/66
However, it didn’t got released, which confirmed by David in Stack Overflow.

I opened a requesting release ticket at 
https://issues.apache.org/jira/browse/BAHIR-279
Please let me know if I there is a better way to request. Thanks!

Best
Hongbo
www.hongbomiao.com


Re: Flink taskmanager in crash loop

2021-08-17 Thread Abhishek Rai
Thanks Yangze, indeed, I see the following in the log about 10s before the
final crash (masked some sensitive data using `MASKED`):

2021-08-16 15:58:13.985 [Canceler/Interrupts for Source: MAKSED] WARN
org.apache.flink.runtime.taskmanager.Task  - Task 'MASKED' did not react to
cancelling signal for 30 seconds, but is stuck in method:
 java.base@11.0.11/jdk.internal.misc.Unsafe.park(Native Method)
java.base@11.0.11/java.util.concurrent.locks.LockSupport.park(Unknown
Source)
java.base@11.0.11/java.util.concurrent.CompletableFuture$Signaller.block(Unknown
Source)
java.base@11.0.11/java.util.concurrent.ForkJoinPool.managedBlock(Unknown
Source)
java.base@11.0.11/java.util.concurrent.CompletableFuture.waitingGet(Unknown
Source)
java.base@11.0.11/java.util.concurrent.CompletableFuture.join(Unknown
Source)
app//org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:705)
app//org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
app//org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:637)
app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
java.base@11.0.11/java.lang.Thread.run(Unknown Source)

2021-08-16 15:58:13.986 [Cancellation Watchdog for Source: MASKED] ERROR
org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Fatal error
occurred while executing the TaskManager. Shutting it down...
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
within 180 + seconds.
  at
org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1718)
  at java.base/java.lang.Thread.run(Unknown Source)



On Mon, Aug 16, 2021 at 7:05 PM Yangze Guo  wrote:

> Hi, Abhishek,
>
> Do you see something like "Fatal error occurred while executing the
> TaskManager" in your log or would you like to provide the whole task
> manager log?
>
> Best,
> Yangze Guo
>
> On Tue, Aug 17, 2021 at 5:17 AM Abhishek Rai 
> wrote:
> >
> > Hello,
> >
> > In our production environment, running Flink 1.13 (Scala 2.11), where
> Flink has been working without issues with a dozen or so jobs running for a
> while, Flink taskmanager started crash looping with a period of ~4 minutes
> per crash.  The stack trace is not very informative, therefore reaching out
> for help, see below.
> >
> > The only other thing that's unusual is that due to what might be a
> product issue (custom job code running on Flink), some or all of our tasks
> are also in a crash loop.  Still, I wasn't expecting taskmanager itself to
> die.  Does taskmanager have some built in feature to crash if all/most
> tasks are crashing?
> >
> > 2021-08-16 15:58:23.984 [main] ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Terminating
> TaskManagerRunner with exit code 1.
> > org.apache.flink.util.FlinkException: Unexpected failure during runtime
> of TaskManagerRunner.
> >   at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:382)
> >   at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$3(TaskManagerRunner.java:413)
> >   at java.base/java.security.AccessController.doPrivileged(Native Method)
> >   at java.base/javax.security.auth.Subject.doAs(Unknown Source)
> >   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
> >   at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >   at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:413)
> >   at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:396)
> >   at
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:354)
> > Caused by: java.util.concurrent.TimeoutException: null
> >   at
> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
> >   at
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
> >   at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
> >   at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown
> Source)
> >   at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
> >   at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source)
> >   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
> >   at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> >   at java.base/java.lang.Thread.run(Unknown Source)
> > 2021-08-16 15:58:23.986 [TaskExecutorLocalStateStoresManager shutdown
> hook] INFO  o.a.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
> Shutting down TaskExecutorLocalStateStoresManager.
> >
> >
>

Re: Flink taskmanager in crash loop

2021-08-17 Thread Abhishek Rai
Before these message, there is the following message in the log:

2021-08-12 23:02:58.015 [Canceler/Interrupts for Source: MASKED])
(1/1)#29103' did not react to cancelling signal for 30 seconds, but is
stuck in method:
 java.base@11.0.11/jdk.internal.misc.Unsafe.park(Native Method)
java.base@11.0.11/java.util.concurrent.locks.LockSupport.parkNanos(Unknown
Source)
java.base@11.0.11/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
Source)
app//org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:341)
app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
app//org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
java.base@11.0.11/java.lang.Thread.run(Unknown Source)

On Tue, Aug 17, 2021 at 9:22 AM Abhishek Rai  wrote:

> Thanks Yangze, indeed, I see the following in the log about 10s before the
> final crash (masked some sensitive data using `MASKED`):
>
> 2021-08-16 15:58:13.985 [Canceler/Interrupts for Source: MAKSED] WARN
> org.apache.flink.runtime.taskmanager.Task  - Task 'MASKED' did not react to
> cancelling signal for 30 seconds, but is stuck in method:
>  java.base@11.0.11/jdk.internal.misc.Unsafe.park(Native Method)
> java.base@11.0.11/java.util.concurrent.locks.LockSupport.park(Unknown
> Source)
> java.base@11.0.11/java.util.concurrent.CompletableFuture$Signaller.block(Unknown
> Source)
> java.base@11.0.11/java.util.concurrent.ForkJoinPool.managedBlock(Unknown
> Source)
> java.base@11.0.11/java.util.concurrent.CompletableFuture.waitingGet(Unknown
> Source)
> java.base@11.0.11/java.util.concurrent.CompletableFuture.join(Unknown
> Source)
>
> app//org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:705)
>
> app//org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
>
> app//org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:637)
> app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
> app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> java.base@11.0.11/java.lang.Thread.run(Unknown Source)
>
> 2021-08-16 15:58:13.986 [Cancellation Watchdog for Source: MASKED] ERROR
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Fatal error
> occurred while executing the TaskManager. Shutting it down...
> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully
> within 180 + seconds.
>   at
> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1718)
>   at java.base/java.lang.Thread.run(Unknown Source)
>
>
>
> On Mon, Aug 16, 2021 at 7:05 PM Yangze Guo  wrote:
>
>> Hi, Abhishek,
>>
>> Do you see something like "Fatal error occurred while executing the
>> TaskManager" in your log or would you like to provide the whole task
>> manager log?
>>
>> Best,
>> Yangze Guo
>>
>> On Tue, Aug 17, 2021 at 5:17 AM Abhishek Rai 
>> wrote:
>> >
>> > Hello,
>> >
>> > In our production environment, running Flink 1.13 (Scala 2.11), where
>> Flink has been working without issues with a dozen or so jobs running for a
>> while, Flink taskmanager started crash looping with a period of ~4 minutes
>> per crash.  The stack trace is not very informative, therefore reaching out
>> for help, see below.
>> >
>> > The only other thing that's unusual is that due to what might be a
>> product issue (custom job code running on Flink), some or all of our tasks
>> are also in a crash loop.  Still, I wasn't expecting taskmanager itself to
>> die.  Does taskmanager have some built in feature to crash if all/most
>> tasks are crashing?
>> >
>> > 2021-08-16 15:58:23.984 [main] ERROR
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Terminating
>> TaskManagerRunner with exit code 1.
>> > org.apache.flink.util.FlinkException: Unexpected failure during runtime
>> of TaskManagerRunner.
>> >   at
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:382)
>> >   at
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$3(TaskManagerRunner.java:413)
>> >   at java.base/java.security.AccessController.doPrivileged(Native
>> Method)
>> >   at java.base/javax.security.auth.Subject.doAs(Unknown Source)
>> >   at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682)
>> >   at
>> org.apache.flink.runtime.security.contexts.HadoopSecurityC

Re: Can we release new flink-connector-redis? Thanks!

2021-08-17 Thread Austin Cawley-Edwards
Hi Hongbo,

Thanks for your interest in the Redis connector! I'm not entirely sure what
the release process is like for Bahir, but I've pulled in @Robert Metzger
 who has been involved in the project in the past and
can give an update there.

Best,
Austin

On Tue, Aug 17, 2021 at 10:41 AM Hongbo Miao 
wrote:

> Hi Flink friends,
>
> I recently have a question about how to set TTL to make Redis keys expire
> in flink-connector-redis.
> I originally posted at Stack Overflow at
> https://stackoverflow.com/questions/68795044/how-to-set-ttl-to-make-redis-keys-expire-in-flink-connector-redis
>
> Then I found there is a pull request added this feature about 2 years ago
> at https://github.com/apache/bahir-flink/pull/66
> However, it didn’t got released, which confirmed by David in Stack
> Overflow.
>
> I opened a requesting release ticket at
> https://issues.apache.org/jira/browse/BAHIR-279
> Please let me know if I there is a better way to request. Thanks!
>
> Best
> Hongbo
> www.hongbomiao.com
>


Task Managers having trouble registering after restart

2021-08-17 Thread Kevin Lam
Hi all,

I'm observing an issue sometimes, and it's been hard to reproduce, where
task managers are not able to register with the Flink cluster. We provision
only the number of task managers required to run a given application, and
so the absence of any of the task managers causes the job to enter a crash
loop where it fails to get the required task slots.

The failure occurs after a job has been running for a while, and when there
have been job and task manager restarts. We run in kubernetes so pod
disruptions occur from time to time, however we're running using the high
availability setup [0]

Has anyone encountered this before? Any suggestions?

Below are some error messages pulled from the task managers failing to
re-register.

```
] - Starting DefaultLeaderRetrievalService with
KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
2021-08-16 13:15:10,112 INFO
 org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Starting DefaultLeaderElectionService with
KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
2021-08-16 13:15:10,205 INFO
 org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
[] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
streaming-sales-model-staging-restserver-leader.
2021-08-16 13:15:10,205 INFO
 org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
[] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
streaming-sales-model-staging-resourcemanager-leader.
2021-08-16 13:15:10,205 INFO
 org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
[] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
streaming-sales-model-staging-dispatcher-leader.
2021-08-16 13:15:10,211 INFO
 org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
- Starting DefaultLeaderRetrievalService with
KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
2021-08-16 13:16:26,103 WARN
 org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
- Error while retrieving the leader gateway. Retrying to connect to
akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
2021-08-16 13:16:30,978 WARN
 org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
- Error while retrieving the leader gateway. Retrying to connect to
akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
```

```
2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
   [] - Uncaught exception in thread
'kafka-producer-network-thread |
trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
at
org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
~[?:?]
at
org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
~[?:?]
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
~[?:?]
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
~[?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.clients.NetworkClient$1
at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
... 6 more
```

```
connection to [null] failed with java.net.ConnectException: Connection
refused: flink-jobmanager/10.28.65.100:6123
2021-08-16 13:14:59,668 WARN  akka.remote.ReliableDeliverySupervisor
[] - Association with remote system
[akka.tcp://flink@flink-jobmanager:6123] has failed, address is now gated
for [50] ms. Reason: [Association failed with
[akka.tcp://flink@flink-jobmanager:6123]] Caused by:
[java.net.ConnectException: Connection refused: flink-jobmanager/
10.28.65.100:6123]
2021-08-16 13:14:59,669 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Could
not resolve ResourceManager address
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0,
retrying in 1 ms: Could not connect to rpc endpoint under address
akka.tcp:

Re: Can we release new flink-connector-redis? Thanks!

2021-08-17 Thread Hongbo Miao
Really appreciate, Austin!

Hongbo
On Aug 17, 2021, 10:33 -0700, Austin Cawley-Edwards , 
wrote:
> Hi Hongbo,
>
> Thanks for your interest in the Redis connector! I'm not entirely sure what 
> the release process is like for Bahir, but I've pulled in @Robert Metzger who 
> has been involved in the project in the past and can give an update there.
>
> Best,
> Austin
>
> > On Tue, Aug 17, 2021 at 10:41 AM Hongbo Miao  
> > wrote:
> > > Hi Flink friends,
> > >
> > > I recently have a question about how to set TTL to make Redis keys expire 
> > > in flink-connector-redis.
> > > I originally posted at Stack Overflow at 
> > > https://stackoverflow.com/questions/68795044/how-to-set-ttl-to-make-redis-keys-expire-in-flink-connector-redis
> > >
> > > Then I found there is a pull request added this feature about 2 years ago 
> > > at https://github.com/apache/bahir-flink/pull/66
> > > However, it didn’t got released, which confirmed by David in Stack 
> > > Overflow.
> > >
> > > I opened a requesting release ticket at 
> > > https://issues.apache.org/jira/browse/BAHIR-279
> > > Please let me know if I there is a better way to request. Thanks!
> > >
> > > Best
> > > Hongbo
> > > www.hongbomiao.com


Re: redis sink from flink

2021-08-17 Thread Jin Yi
great, thanks for the pointers everyone.

i'm going to pursue rolling my own built around lettuce since it seems more
feature-full wrt async semantics.

On Mon, Aug 16, 2021 at 7:21 PM Yik San Chan 
wrote:

> By the way, this post in Chinese showed how we do it exactly with code.
>
> https://yiksanchan.com/posts/flink-bulk-insert-redis
>
> And yes it had buffered writes support by leveraging Flink operator state,
> and Redis Pipelining. Feel free to let you know if you have any questions.
>
>
> On Tue, Aug 17, 2021 at 10:15 AM Yik San Chan 
> wrote:
>
>> Hi Jin,
>>
>> I was in the same shoes. I tried bahir redis connector at first, then I
>> felt it was very limited, so I rolled out my own. It was actually quite
>> straightforward.
>>
>> All you need to do is to extend RichSinkFunction, then put your logic
>> inside. Regarding Redis clients, Jedis (https://github.com/redis/jedis)
>> is quite popular and simple to get started.
>>
>> Let me know if you love to learn more details about our implementation.
>>
>> Best,
>> Yik San
>>
>> On Tue, Aug 17, 2021 at 9:15 AM Jin Yi  wrote:
>>
>>> is apache bahir still a thing?  it hasn't been touched for months (since
>>> redis 2.8.5).
>>>
>>> as such, looking at the current flink connector docs, it's no longer
>>> pointing to anything from the bahir project.  looking around in either the
>>> flink or bahir newsgroups doesn't turn up anything regarding bahir's EOL.
>>>
>>> is the best bet for a flink to redis sink something i roll on my own
>>> (inclined to go this route w/ buffered writes)?  or should i try going
>>> through via kafka and using confluent's kafka redis connector (flink =>
>>> kafka => redis)?
>>>
>>


Re: flink not able to get scheme for S3

2021-08-17 Thread tarun joshi
Thanks Chesnay ! that helped me resolve the issue


On Fri, 6 Aug 2021 at 04:31, Chesnay Schepler  wrote:

> The reason this doesn't work is that your application works directly
> against Hadoop.
> The filesystems in the plugins directory are only loaded via specific
> code-paths, specifically when the Flink FileSystem class is used.
> Since you are using Hadoop directly you are side-stepping the plugin
> mechanism.
>
> So you have to make sure that Hadoop + Hadoop's S3 filesystem is available
> to the client.
>
> On 06/08/2021 08:02, tarun joshi wrote:
>
> Hey All,
>
> I am running flink in docker containers (image Tag
> :flink:scala_2.11-java11) on EC2 and getting exception as I am trying to
> submit a job through the local ./opt/flink/bin
>
> *org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: No FileSystem for scheme "s3"*
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
> FileSystem for scheme "s3"
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
> at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
> at
> org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
> at
> org.apache.flink.examples.java.wordcount.WordCount.printParquetData(WordCount.java:142)
> at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:83)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ... 8 more
>
> This is the way I am invoking Flink Built_IN S3 plugins for the
> Jobmanager and TaskManager :
>
>
>
>
>
>
>
>
>
> *docker run \ --rm \ --volume /root/:/root/ \ --env
> JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \ --env
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" \
> --env
> ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
> \ --name=jobmanager \ --network flink-network \ --publish 8081:8081 \
> flink:scala_2.11-java11 jobmanager &*
>
>
>
>
>
>
>
>
>
> *docker run \ --rm \ --env
> JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \ --env
> TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" \
> --env
> ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
> \ --name=taskmanager_0 \ --network flink-network \ flink:scala_2.11-java11
> taskmanager & *
>
> This is how I am defining dependencies in my pom.xml (I am working upon
> the Flink-Examples project from Flink Github repo).
>
> 
>
>   org.apache.flink
>   flink-java
>   ${project.version}
>   provided
>
>
>
>   org.apache.flink
>   flink-scala_${scala.binary.version}
>   ${project.version}
>   provided
>
>
>
>   org.apache.flink
>   flink-clients_${scala.binary.version}
>   ${project.version}
>   provided
>
>
>
>   org.apache.parquet
>   parquet-avro
>   1.12.0
>
>
>   org.apache.parquet
>   parquet-column
>   1.12.0
>
>
>   org.apache.parquet
>   parquet-hadoop
>   1.12.0
>
>
>   org.apache.hadoop
>   hadoop-common
>   3.3.1
>
> 
>
> I am also able to see plugins being loaded for JobManager and TaskManager
> :
>
>
>
>
>
> *Linking flink-s3-fs-hadoop-1.13.1.jar to plugin directory Successfully
> enabled flink-s3-fs-hadoop-1.13.1.jar Linking flink-s3-fs-presto-1.13.1.jar
> to plugin directory Successfully enabled flink-s3-fs-presto-1.13.1.jar *
>
> Let me if I a

flink Kinesis Consumer Connected but not consuming

2021-08-17 Thread tarun joshi
Hey All,

I am running flink in docker containers (image Tag
:flink:scala_2.11-java11) on EC2.

I am able to connect to a Kinesis Connector but nothing is being consumed.

My command to start Jobmanager and TaskManager :









*docker run \--rm \--volume /root/:/root/ \--env
JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
\--env
ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
\--name=jobmanager \--network flink-network \--publish 8081:8081
\flink:scala_2.11-java11 jobmanager &*









*docker run \--rm \--env
JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \--env
TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}"
\--env
ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
\--name=taskmanager_0 \--network flink-network \flink:scala_2.11-java11
taskmanager &*

2021-08-17 22:38:01,106 INFO org.apache.flink.streaming.connectors.kinesis.
FlinkKinesisConsumer [] -
Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='', shard='{ShardId: shardId-,HashKeyRange:
{StartingHashKey: 0,EndingHashKey: 34028236692093846346337460743176821144}
,SequenceNumberRange: {StartingSequenceNumber:
49600280467722672235426674687631661510244124728928239618,}}'}, starting
state set as sequence number LATEST_SEQUENCE_NUM

&&& this for each shard Consumer

2021-08-17 22:38:01,107 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher
[] - Subtask 0 will start consuming seeded shard
StreamShardHandle{streamName='web-clickstream', shard='{ShardId:
shardId-,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
34028236692093846346337460743176821144},SequenceNumberRange:
{StartingSequenceNumber:
49600280467722672235426674687631661510244124728928239618,}}'} from sequence
number LATEST_SEQUENCE_NUM with ShardConsumer 0

my program is simple to test out a DataStream from Kinesis

FlinkKinesisConsumer kinesisConsumer =
new FlinkKinesisConsumer<>(
"", new SimpleStringSchema(),
getKafkaConsumerProperties());
env.addSource(kinesisConsumer).print();

env.execute("Read files in streaming fashion");

Other Facts:


   1. I can see data being flowing into our kinesis stream from the
   Monitoring Tab of AWS continuously.
   2. I was facing issues with Authorization of accessing the Kinesis in
   our AWS infra, but I resolved that by moving in the same security group of
   Kinesis deployment and creating a role with full access to Kinesis.


Any pointers are really appreciated!

Thanks,
Tarun


Re: Handling HTTP Requests for Keyed Streams

2021-08-17 Thread JING ZHANG
Hi Rion,
Your solution is good.

It seems that you need enrich a stream with data queries from external Http
request. There is another solution for reference, just like the mechanism
of lookup join in Flink SQL.
Lookup Join in Flink SQL supports two modes: Async mode and Sync mode.
For each input data from the original source, it lookup keys from dimension
table.
To avoid frequency external I/O, some dimension sources use Cache in memory.
E.g HBase dimension table source would use LRU Cache in memory, it caches
the value for recently used, if the input data hits the query, it could
avoid external I/O; else an external
call would be triggered, and the result value would be cached into LRU
Cache.
E.g Hive dimension table source would load all data into Cache in Memory,
the cache would refresh regularly according to the specified interval.

Hope the information is helpful.

Best,
JING ZHANG


Rion Williams  于2021年8月17日周二 下午9:23写道:

> Hi Caizhi,
>
> I don’t mind the request being synchronous (or not using the Async I/O
> connectors). Assuming I go down that route would this be the appropriate
> way to handle this? Specifically creating an HttpClient and storing the
> result in state and on a keyed stream if the state was empty?
>
> It makes sense to me, just wondering if there are any gotchas or
> recommendations in terms of a client that might support things like retries
> and if this a good pattern to accomplish this.
>
> Thanks,
>
> Rion
>
> On Aug 16, 2021, at 11:57 PM, Caizhi Weng  wrote:
>
> 
> Hi!
>
> As you mentioned that the configuration fetching is very infrequent, why
> don't you use a blocking approach to send HTTP requests and receive
> responses? This seems like a more reasonable solution to me.
>
> Rion Williams  于2021年8月17日周二 上午4:00写道:
>
>> Hi all,
>>
>> I've been exploring a few different options for storing tenant-specific
>> configurations within Flink state based on the messages I have flowing
>> through my job. Initially I had considered creating a source that would
>> periodically poll an HTTP API and connect that stream to my original event
>> stream.
>>
>> However, I realized that this configuration information would basically
>> never change and thus it doesn't quite make sense to poll so frequently. My
>> next approach would be to have a function that would be keyed (by tenant)
>> and storing the configuration for that tenant in state (and issue an HTTP
>> call when I did not have it). Something like this:
>>
>> class ConfigurationLookupFunction: KeyedProcessFunction> JsonObject>() {
>> // Tenant specific configuration
>> private lateinit var httpClient: HttpClient
>> private lateinit var configuration: ValueState
>>
>> override fun open(parameters: Configuration) {
>> super.open(parameters)
>> httpClient = HttpClient.newHttpClient()
>> }
>>
>> override fun processElement(message: JsonObject, context: Context, out: 
>> Collector) {
>> if (configuration.value() == null){
>> // Issue a request to the appropriate API to load the 
>> configuration
>> val url = 
>> HttpRequest.newBuilder(URI.create(".../${context.currentKey}")).build()
>> httpClient.send(..., {
>> // Store the configuration info within state here
>> configuration.update(...)
>> })
>>
>> out.collect(message)
>> }
>> else {
>> // Get the configuration information and pass it downstream to 
>> be used by the sink
>> out.collect(message)
>> }
>> }
>> }
>>
>> I didn't see any support for using the Async I/O functions from a keyed
>> context, otherwise I'd imagine that would be ideal. The requests themselves
>> should be very infrequent (initial call per tenant) and I'd imagine after
>> that the necessary configuration could be pulled/stored within the state
>> for that key.
>>
>> Is there a good way of handling this that I might be overlooking with an
>> existing Flink construct or function? I'd love to be able to leverage the
>> Async I/O connectors as they seem pretty well thought out.
>>
>> Thanks in advance!
>>
>> Rion
>>
>>
>>


Re: Flink taskmanager in crash loop

2021-08-17 Thread Yangze Guo
> 2021-08-16 15:58:13.986 [Cancellation Watchdog for Source: MASKED] ERROR 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Fatal > error 
> occurred while executing the TaskManager. Shutting it down...
> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully 
> within 180 + seconds.

It seems the Task 'MASKED' can not be terminated within the timeout. I
think this would be the root cause of TaskManager's termination. We
need to find why Task 'MASKED' has been canceled. Can you provide some
logs related to it? Maybe you can search the "CANCELING" in jm and tm
logs.

Best,
Yangze Guo

On Wed, Aug 18, 2021 at 1:20 AM Abhishek Rai  wrote:
>
> Before these message, there is the following message in the log:
>
> 2021-08-12 23:02:58.015 [Canceler/Interrupts for Source: MASKED]) 
> (1/1)#29103' did not react to cancelling signal for 30 seconds, but is stuck 
> in method:
>  java.base@11.0.11/jdk.internal.misc.Unsafe.park(Native Method)
> java.base@11.0.11/java.util.concurrent.locks.LockSupport.parkNanos(Unknown 
> Source)
> java.base@11.0.11/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
>  Source)
> app//org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:341)
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
> app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
> app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:661)
> app//org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
> app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
> app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> java.base@11.0.11/java.lang.Thread.run(Unknown Source)
>
> On Tue, Aug 17, 2021 at 9:22 AM Abhishek Rai  wrote:
>>
>> Thanks Yangze, indeed, I see the following in the log about 10s before the 
>> final crash (masked some sensitive data using `MASKED`):
>>
>> 2021-08-16 15:58:13.985 [Canceler/Interrupts for Source: MAKSED] WARN 
>> org.apache.flink.runtime.taskmanager.Task  - Task 'MASKED' did not react to 
>> cancelling signal for 30 seconds, but is stuck in method:
>>  java.base@11.0.11/jdk.internal.misc.Unsafe.park(Native Method)
>> java.base@11.0.11/java.util.concurrent.locks.LockSupport.park(Unknown Source)
>> java.base@11.0.11/java.util.concurrent.CompletableFuture$Signaller.block(Unknown
>>  Source)
>> java.base@11.0.11/java.util.concurrent.ForkJoinPool.managedBlock(Unknown 
>> Source)
>> java.base@11.0.11/java.util.concurrent.CompletableFuture.waitingGet(Unknown 
>> Source)
>> java.base@11.0.11/java.util.concurrent.CompletableFuture.join(Unknown Source)
>> app//org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:705)
>> app//org.apache.flink.streaming.runtime.tasks.SourceStreamTask.cleanUpInvoke(SourceStreamTask.java:186)
>> app//org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:637)
>> app//org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
>> app//org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>> java.base@11.0.11/java.lang.Thread.run(Unknown Source)
>>
>> 2021-08-16 15:58:13.986 [Cancellation Watchdog for Source: MASKED] ERROR 
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  - Fatal error 
>> occurred while executing the TaskManager. Shutting it down...
>> org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully 
>> within 180 + seconds.
>>   at 
>> org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1718)
>>   at java.base/java.lang.Thread.run(Unknown Source)
>>
>>
>>
>> On Mon, Aug 16, 2021 at 7:05 PM Yangze Guo  wrote:
>>>
>>> Hi, Abhishek,
>>>
>>> Do you see something like "Fatal error occurred while executing the
>>> TaskManager" in your log or would you like to provide the whole task
>>> manager log?
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Tue, Aug 17, 2021 at 5:17 AM Abhishek Rai  wrote:
>>> >
>>> > Hello,
>>> >
>>> > In our production environment, running Flink 1.13 (Scala 2.11), where 
>>> > Flink has been working without issues with a dozen or so jobs running for 
>>> > a while, Flink taskmanager started crash looping with a period of ~4 
>>> > minutes per crash.  The stack trace is not very informative, therefore 
>>> > reaching out for help, see below.
>>> >
>>> > The only other thing that's unusual is that due to what might be a 
>>> > product issue (custom job code running on Flink), some or all of our 
>>> > tasks are also in a crash loop.  Still, I wasn't expecting taskmanager 
>>> > itself to die.  Does taskmanager have some built in feature to crash if 
>>> > all/most tasks are crashing?
>>> >
>>> > 2021-08-16 15

Re: PyFlink StreamingFileSink bulk-encoded format (Avro)

2021-08-17 Thread Dian Fu
Hi Kamil,

AFAIK, it should still not support Avro format in Python StreamingFileSink in 
the Python DataStream API. However, I guess you could convert DataStream to 
Table[1] and then you could use all the connectors supported in the Table & 
SQL. In this case, you could use the FileSystem connector[2] and Avro format[3] 
for your requirements.

Regards,
Dian

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/python/datastream/intro_to_datastream_api/#emit-results-to-a-table--sql-sink-connector
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/
 

[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro/

> 2021年8月17日 下午4:13,Kamil ty  写道:
> 
> Hello,
> 
> I'm trying to save my data stream to an Avro file on HDFS. In Flink 
> documentation I can only see explanations for Java/Scala. However, I can't 
> seem to find a way to do it in PyFlink. Is this possible to do in PyFlink 
> currently?
> 
> Kind Regards
> Kamil