Flink - Nifi Connectors - Class not found

2016-11-11 Thread PACE, JAMES
I am running Apache Flink 1.1.3 - Hadoop version 1.2.1 with the NiFi connector. 
 When I run a program with a single NiFi Source, I receive the following Stack 
trace in the logs:



2016-11-11 19:28:25,661 WARN  org.apache.flink.client.CliFrontend

- Unable to locate custom CLI class 
org.apache.flink.yarn.cli.FlinkYarnSessionCli. Flink is not compiled with 
support for this class.

java.lang.ClassNotFoundException: org.apache.flink.yarn.cli.FlinkYarnSessionCli

at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)

at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:195)

   at 
org.apache.flink.client.CliFrontend.loadCustomCommandLine(CliFrontend.java:1136)

at org.apache.flink.client.CliFrontend.(CliFrontend.java:128)

2016-11-11 19:28:25,855 INFO  org.apache.flink.client.CliFrontend

2016-11-11 19:28:25,855 INFO  org.apache.flink.client.CliFrontend

-  Starting Command Line Client (Version: 1.1.3, Rev:8e8d454, 
Date:10.10.2016 @ 13:26:32 UTC)

2016-11-11 19:28:25,855 INFO  org.apache.flink.client.CliFrontend

-  Current user: x

2016-11-11 19:28:25,856 INFO  org.apache.flink.client.CliFrontend

-  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 
1.7/24.80-b11

2016-11-11 19:28:25,856 INFO  org.apache.flink.client.CliFrontend

-  Maximum heap size: 3545 MiBytes

2016-11-11 19:28:25,866 INFO  org.apache.flink.client.CliFrontend

-  Hadoop version: 1.2.1



Seems like the Nifi connector requires the yarn enabled version of flink?  Is 
there a dependency I can add to get over this hurdle?



Thanks



Jim



RE: Flink - Nifi Connectors - Class not found

2016-11-14 Thread PACE, JAMES
bin/flink run -c com.att.flink.poc.NifiTest jars/flinkpoc-0.0.1-SNAPSHOT.jar

I have another entry point in this jar that uses readFileStream and that works 
fine.

From: Robert Metzger [mailto:rmetz...@apache.org]
Sent: Sunday, November 13, 2016 12:53 AM
To: user@flink.apache.org
Subject: Re: Flink - Nifi Connectors - Class not found

Hi,

the problem is that Flink's YARN code is not available in the Hadoop 1.2.1 
build.

How do you try to execute the Flink job to trigger this error message?

On Fri, Nov 11, 2016 at 12:23 PM, PACE, JAMES 
mailto:jp4...@att.com>> wrote:

I am running Apache Flink 1.1.3 – Hadoop version 1.2.1 with the NiFi connector. 
 When I run a program with a single NiFi Source, I receive the following Stack 
trace in the logs:



2016-11-11 19:28:25,661 WARN  org.apache.flink.client.CliFrontend

- Unable to locate custom CLI class 
org.apache.flink.yarn.cli.FlinkYarnSessionCli. Flink is not compiled with 
support for this class.

java.lang.ClassNotFoundException: org.apache.flink.yarn.cli.FlinkYarnSessionCli

at java.net.URLClassLoader$1.run(URLClassLoader.java:366)

at java.net.URLClassLoader$1.run(URLClassLoader.java:355)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)

at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:195)

   at 
org.apache.flink.client.CliFrontend.loadCustomCommandLine(CliFrontend.java:1136)

at org.apache.flink.client.CliFrontend.(CliFrontend.java:128)

2016-11-11 19:28:25,855 INFO  org.apache.flink.client.CliFrontend

2016-11-11 19:28:25,855 INFO  org.apache.flink.client.CliFrontend

-  Starting Command Line Client (Version: 1.1.3, Rev:8e8d454, 
Date:10.10.2016 @ 13:26:32 UTC)

2016-11-11 19:28:25,855 INFO  org.apache.flink.client.CliFrontend

-  Current user: x

2016-11-11 19:28:25,856 INFO  org.apache.flink.client.CliFrontend

-  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 
1.7/24.80-b11

2016-11-11 19:28:25,856 INFO  org.apache.flink.client.CliFrontend

-  Maximum heap size: 3545 MiBytes

2016-11-11 19:28:25,866 INFO  org.apache.flink.client.CliFrontend

-  Hadoop version: 1.2.1



Seems like the Nifi connector requires the yarn enabled version of flink?  Is 
there a dependency I can add to get over this hurdle?



Thanks



Jim




Taskmanager SSL fails looking for Subject Alternative IP Address

2018-07-12 Thread PACE, JAMES
I have the following SSL configuration for a 3 node HA flink cluster:

#taskmanager.data.ssl.enabled: false
security.ssl.enabled: true
security.ssl.keystore: /opt/app/certificates/server-keystore.jks
security.ssl.keystore-password: 
security.ssl.key-password: 
security.ssl.truststore: /opt/app/certificates/cacerts
security.ssl.truststore-password: 
security.ssl.verify-hostname: true

The job we're running is the sample WordCount.jar.  The running version of 
flink is 1.4.0.  It's not the latest, but I didn't see anything that looked 
like updating would solve this issue.

If either security.ssl.verify-hostname is set to false or 
taskmanager.data.ssl.enabled is set to false, everything works fine.

When flink is run in the above configuration above, with ssl fully enabled and 
security.ssl.verify-hostname: true, the flink job fails.  However, when going 
through the logs, SSL appears fine for akka, blob service, and jobmanager.

The root cause looks to be Caused by: java.security.cert.CertificateException: 
No subject alternative names matching IP address xxx.xxx.xxx.xxx found.
I have tried setting taskmanager.hostname to the FQDN of the host, but that did 
not change anything.
We don't generate certificates with SAN fields.

Any thoughts would be appreciated.

This is the full stack trace
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated 
due to an exception: Sending the partition request failed.
at 
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
Sending the partition request failed.
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClient$1.operationComplete(PartitionRequestClient.java:119)
at 
org.apache.flink.runtime.io.network.netty.PartitionRequestClient$1.operationComplete(PartitionRequestClient.java:111)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at 
org.apache.flink.shaded.netty4.io.netty.channel.PendingWriteQueue.safeFail(PendingWriteQueue.java:252)
at 
org.apache.flink.shaded.netty4.io.netty.channel.PendingWriteQueue.removeAndFailAll(PendingWriteQueue.java:112)
at 
org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.setHandshakeFailure(SslHandler.java:1256)
at 
org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1040)
at 
org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:934)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:315)
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:229)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1431)
at 
sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
at sun.security.ssl.SSLEngineImpl.readNetRecord(SSLEngineImpl.java:813)
at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:781)
at javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:624)
at 
org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1114)
at 
org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.unwrap(Ss

SQL not evaluating correctly

2021-10-29 Thread PACE, JAMES
We are in the process of upgrading from Flink 1.9.3 to 1.13.3.  We have noticed 
that statements with either where UPPER(field) or LOWER(field) in combination 
with an IN do not always evaluate correctly.

The following test case highlights this problem.


import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class TestCase {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

TestData testData = new TestData();
testData.setField1("bcd");
DataStream stream = env.fromElements(testData);
stream.print();  // To prevent 'No operators' error

final StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(env);
tableEnvironment.createTemporaryView("testTable", stream, 
Schema.newBuilder().build());

// Fails because abcd is larger than abc
tableEnvironment.executeSql("select *, '1' as run from testTable WHERE 
lower(field1) IN ('abcd', 'abc', 'bcd', 'cde')").print();
// Succeeds because lower was removed
tableEnvironment.executeSql("select *, '2' as run from testTable WHERE 
field1 IN ('abcd', 'abc', 'bcd', 'cde')").print();
// These 4 succeed because the smallest literal is before abcd
tableEnvironment.executeSql("select *, '3' as run from testTable WHERE 
lower(field1) IN ('abc', 'abcd', 'bcd', 'cde')").print();
tableEnvironment.executeSql("select *, '4' as run from testTable WHERE 
lower(field1) IN ('abc', 'bcd', 'abhi', 'cde')").print();
tableEnvironment.executeSql("select *, '5' as run from testTable WHERE 
lower(field1) IN ('cde', 'abcd', 'abc', 'bcd')").print();
tableEnvironment.executeSql("select *, '6' as run from testTable WHERE 
lower(field1) IN ('cde', 'abc', 'abcd', 'bcd')").print();
// Fails because smallest is not first
tableEnvironment.executeSql("select *, '7' as run from testTable WHERE 
lower(field1) IN ('cdef', 'abce', 'abcd', 'ab', 'bcd')").print();
// Succeeds
tableEnvironment.executeSql("select *, '8' as run from testTable WHERE 
lower(field1) IN ('ab', 'cdef', 'abce', 'abcdefgh', 'bcd')").print();

env.execute("TestCase");
}

public static class TestData {
private String field1;

public String getField1() {
return field1;
}

public void setField1(String field1) {
this.field1 = field1;
}
}
}

The job produces the following output:
Empty set
++++
| op | field1 |run |
++++
| +I |bcd |  2 |
++++
1 row in set
++++
| op | field1 |run |
++++
| +I |bcd |  3 |
++++
1 row in set
++++
| op | field1 |run |
++++
| +I |bcd |  4 |
++++
1 row in set
++++
| op | field1 |run |
++++
| +I |bcd |  5 |
++++
1 row in set
++++
| op | field1 |run |
++++
| +I |bcd |  6 |
++++
1 row in set
Empty set
++++
| op | field1 |run |
++++
| +I |bcd |  8 |
++

Flink Operator Resources Requests and Limits

2022-07-27 Thread PACE, JAMES
We are currently evaluating the apache flink operator (version 1.1.0) to 
replace the operator that we currently use.  Setting the memory and cpu 
resources sets both the request and the limit for the pod.  Previously, we were 
only setting request allowing pods to oversubscribe to CPU when needed to 
handle the burstiness of the traffic that we see into the jobs.

Is there a way to set different values for cpu for resource requests and 
limits, or omit the limit specification?  If not, is this something that would 
be on the roadmap?

Thanks.

Jim


RE: Flink Operator Resources Requests and Limits

2022-07-27 Thread PACE, JAMES
That does not seem to work.

For instance:
  jobManager:
podTemplate:
  spec:
containers:
  - resources:
  requests:
cpu: "0.5"
memory: "2048m"
  limits:
cpu: "2"
memory: "2048m"

results in a pod like this:
Limits:
  cpu: 1
  memory:  1600Mi
Requests:
  cpu: 1
  memory:  1600Mi

This appears to be overwritten by a default if cpu and memory do not appear in 
the jobManager resources.

Jim

From: Őrhidi Mátyás 
Sent: Wednesday, July 27, 2022 11:16 AM
To: PACE, JAMES 
Cc: user@flink.apache.org
Subject: Re: Flink Operator Resources Requests and Limits

Hi James,

Have you considered using pod templates already?
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/__;!!BhdT!nRNxAE-RKMI_7MAPOsX8gQPmV7xrt0RNMiAe6EmsvhPc155zBt_davnc4bSmg0WzfnY1SvQSA3-42aXx-id0VQ$>

Regards,
Matyas

On Wed, Jul 27, 2022 at 3:21 PM PACE, JAMES 
mailto:jp4...@att.com>> wrote:
We are currently evaluating the apache flink operator (version 1.1.0) to 
replace the operator that we currently use.  Setting the memory and cpu 
resources sets both the request and the limit for the pod.  Previously, we were 
only setting request allowing pods to oversubscribe to CPU when needed to 
handle the burstiness of the traffic that we see into the jobs.

Is there a way to set different values for cpu for resource requests and 
limits, or omit the limit specification?  If not, is this something that would 
be on the roadmap?

Thanks.

Jim


SQL Changes between 1.14 and 1.15?

2022-10-14 Thread PACE, JAMES
We've noticed the following difference in sql when upgrading from flink 1.14.5 
to 1.15.2 around characters that are escaped in an sql statement:

This statement:
  tableEnvironment.executeSql("select * from testTable WHERE lower(field1) LIKE 
'b\"cd\"e%'");
produces a runtime error in flink 1.15.2, but executes properly in flink 1.14.5

This can be worked around by escaping the backslash, changing the statement to:
  tableEnvironment.executeSql("select * from testTable WHERE lower(field1) LIKE 
'b\\\"cd\\\"e%'");

This code illustrates the issue:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class TestCase3 {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

TestData testData = new TestData();
testData.setField1("b\"cd\"e");
DataStream stream = env.fromElements(testData);
stream.print();
final StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(env);
tableEnvironment.createTemporaryView("testTable", stream, 
Schema.newBuilder().build());

// Works with Flink 1.14.x, flink runtime errors in 1.15.2.  Uncomment 
to see runtime trace
//tableEnvironment.executeSql("select *, '1' as run from testTable 
WHERE lower(field1) LIKE 'b\"cd\"e%'").print();
// Works with 1.15.2
tableEnvironment.executeSql("select * from testTable WHERE 
lower(field1) LIKE 'b\\\"cd\\\"e%'").print();

env.execute("TestCase");
}

public static class TestData {
private String field1;

public String getField1() { return field1; }
public void setField1(String field1) { this.field1 = field1; }
}
}

Thanks
Jim