no suitable table factory for file sources
Hi, I can't read from a file source using the sql-client tool. I just set up a simple test scenario with the configuration file in [1] I'm getting the error in [2] starting the environment with bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib in the standard Flink 1.9.1 download environment. Reading the documentation https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors I expected the file system connectors as "built in" I'm getting a similar issue while following the sql-training tutorial from https://github.com/ververica/sql-training/wiki. There I changed the used sql-client-conf.yaml file of the docker container for the sql client. Thanks for any hints Günter [1] # Define table sources here. See the Table API & SQL documentation for details. tables: - name: Guenter type: source-table update-mode: append connector: type: filesystem path: file:///home/swissbib/temp/trash/hello.txt format: type: csv # required: define the schema either by using type information schema: "ROW(test STRING)" # or use the table's schema derive-schema: true field-delimiter: ";" # optional: field delimiter character (',' by default) line-delimiter: "\r\n" # optional: line delimiter ("\n" by default; otherwise "\r" or "\r\n" are allowed) quote-character: "'" # optional: quote character for enclosing field values ('"' by default) allow-comments: true # optional: ignores comment lines that start with "#" (disabled by default) # if enabled, make sure to also ignore parse errors to allow empty rows ignore-parse-errors: true # optional: skip fields and rows with parse errors instead of failing; # fields are set to null in case of errors array-element-delimiter: "|" # optional: the array element delimiter string for separating # array and row element values (";" by default) escape-character: "\\" # optional: escape character for escaping values (disabled by default) null-literal: "n/a" # optional: null literal string that is interpreted as a # null value (disabled by default) #== # Execution properties #== # Execution properties allow for changing the behavior of a table program. execution: #planner: blink type: streaming # 'batch' or 'streaming' execution result-mode: table # 'changelog' or 'table' presentation of results parallelism: 1 # parallelism of the program max-parallelism: 128 # maximum parallelism min-idle-state-retention: 0 # minimum idle state retention in ms max-idle-state-retention: 0 # maximum idle state retention in ms #== # Deployment properties #== # Deployment properties allow for describing the cluster to which table # programs are submitted to. deployment: type: standalone # only the 'standalone' deployment is supported response-timeout: 5000 # general cluster communication timeout in ms gateway-address: "" # (optional) address from cluster to gateway gateway-port: 0 # (optional) port from cluster to gateway [2] bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib Reading default environment from: file:/usr/local/swissbib/flink-1.9.1/gh/sql-client-conf.yaml No session environment specified. Validating current environment... Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again. at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194) Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562) at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382) at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144) ... 2 more Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: No factory supports all properties. The following properties are requested:
Re: configuration of standalone cluster
swissbib@sb-ust1:~$ java -version openjdk version "11.0.2" 2019-01-15 OpenJDK Runtime Environment (build 11.0.2+9-Ubuntu-3ubuntu118.04.3) OpenJDK 64-Bit Server VM (build 11.0.2+9-Ubuntu-3ubuntu118.04.3, mixed mode, sharing) swissbib@sb-ust1:~$ Is version 8 more appropriate? Günter On 02.05.19 13:48, Chesnay Schepler wrote: Which java version are you using? On 01/05/2019 21:31, Günter Hipler wrote: Hi, For the first time I'm trying to set up a standalone cluster. My current configuration 4 server (1 jobmanger and 3 taskmanager) a) starting the cluster swissbib@sb-ust1:/swissbib_index/apps/flink/bin$ ./start-cluster.sh Starting cluster. Starting standalonesession daemon on host sb-ust1. Starting taskexecutor daemon on host sb-ust2. Starting taskexecutor daemon on host sb-ust3. Starting taskexecutor daemon on host sb-ust4. On the taskmanager side I get the error 2019-05-01 21:16:32,794 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.ssl.tcp://flink@sb-ust1:6123] has failed, address is now gated for [50] ms. Reason: [class [B cannot be cast to class [C ([B and [C are in module java.base of loader 'bootstrap')] 2019-05-01 21:16:41,932 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not resolve ResourceManager address akka.ssl.tcp://flink@sb-ust1:6123/user/resourcemanager, retrying in 1 ms: Ask timed out on [ActorSelection[Anchor(akka.ssl.tcp://flink@sb-ust1:6123/), Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message of type "akka.actor.Identify".. 2019-05-01 21:17:01,960 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not resolve ResourceManager address akka.ssl.tcp://flink@sb-ust1:6123/user/resourcemanager, retrying in 1 ms: Ask timed out on [ActorSelection[Anchor(akka.ssl.tcp://flink@sb-ust1:6123/), Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message of type "akka.actor.Identify".. port 6123 is allowed on the jobmanager but I haven't created a specialized flink - user. - Is this necessary? if yes, is it possible to define another user for communication purposes? I followed the documentation to setup a ssl based communication (https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/security-ssl.html#example-ssl-setup-standalone-and-kubernetes) and created a keystore as described: keytool -genkeypair -alias swissbib.internal -keystore internal.keystore -dname "CN=flink.internal" -storepass verysecret -keypass verysecret -keyalg RSA -keysize 4096 and deployed the flink-conf.yaml on the whole cluster (part of flink-conf.yaml) security.ssl.internal.enabled: true security.ssl.internal.keystore: /swissbib_index/apps/flink/conf/internal.keystore security.ssl.internal.truststore: /swissbib_index/apps/flink/conf/internal.keystore security.ssl.internal.keystore-password: verysecret security.ssl.internal.truststore-password: verysecret security.ssl.internal.key-password: verysecret but this doesn't solve the problem - still no connection between task-managers and job-managers. - another question: which ports have to be enabled in the firewall for a standalone cluster? Thanks for any hints! Günter
configuration of standalone cluster
Hi, For the first time I'm trying to set up a standalone cluster. My current configuration 4 server (1 jobmanger and 3 taskmanager) a) starting the cluster swissbib@sb-ust1:/swissbib_index/apps/flink/bin$ ./start-cluster.sh Starting cluster. Starting standalonesession daemon on host sb-ust1. Starting taskexecutor daemon on host sb-ust2. Starting taskexecutor daemon on host sb-ust3. Starting taskexecutor daemon on host sb-ust4. On the taskmanager side I get the error 2019-05-01 21:16:32,794 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.ssl.tcp://flink@sb-ust1:6123] has failed, address is now gated for [50] ms. Reason: [class [B cannot be cast to class [C ([B and [C are in module java.base of loader 'bootstrap')] 2019-05-01 21:16:41,932 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not resolve ResourceManager address akka.ssl.tcp://flink@sb-ust1:6123/user/resourcemanager, retrying in 1 ms: Ask timed out on [ActorSelection[Anchor(akka.ssl.tcp://flink@sb-ust1:6123/), Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message of type "akka.actor.Identify".. 2019-05-01 21:17:01,960 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Could not resolve ResourceManager address akka.ssl.tcp://flink@sb-ust1:6123/user/resourcemanager, retrying in 1 ms: Ask timed out on [ActorSelection[Anchor(akka.ssl.tcp://flink@sb-ust1:6123/), Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent message of type "akka.actor.Identify".. port 6123 is allowed on the jobmanager but I haven't created a specialized flink - user. - Is this necessary? if yes, is it possible to define another user for communication purposes? I followed the documentation to setup a ssl based communication (https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/security-ssl.html#example-ssl-setup-standalone-and-kubernetes) and created a keystore as described: keytool -genkeypair -alias swissbib.internal -keystore internal.keystore -dname "CN=flink.internal" -storepass verysecret -keypass verysecret -keyalg RSA -keysize 4096 and deployed the flink-conf.yaml on the whole cluster (part of flink-conf.yaml) security.ssl.internal.enabled: true security.ssl.internal.keystore: /swissbib_index/apps/flink/conf/internal.keystore security.ssl.internal.truststore: /swissbib_index/apps/flink/conf/internal.keystore security.ssl.internal.keystore-password: verysecret security.ssl.internal.truststore-password: verysecret security.ssl.internal.key-password: verysecret but this doesn't solve the problem - still no connection between task-managers and job-managers. - another question: which ports have to be enabled in the firewall for a standalone cluster? Thanks for any hints! Günter
Re: problems starting the training exercise TaxiRideCleansing on local cluster
Hi Nico, thanks for looking into it. The reason for the behavior on my system: I had two different jdk versions installed (openjdk and oracle jdk) - I wasn't aware of because I prefer to use generally the oracle jdk. Somehow, I didn't analyze at greater depth, both versions were used in different scenarios which seemed to cause the error. After removing openjdk completely from my system I could use the local flink cluster with my application jar. Sorry for the inconvenience! Günter On 10.07.2017 15:43, Nico Kruber wrote: Hi Günter, unfortunately, I cannot reproduce your error. This is what I did (following http://training.data-artisans.com/devEnvSetup.html): * clone and build the flink-training-exercises project: git clone https://github.com/dataArtisans/flink-training-exercises.git cd flink-training-exercises mvn clean install * downloaded & extracted flink 1.3.1 (hadoop 2.7, Scala 2.10 - but that should not matter) * /bin/start-local.sh * create a development project: mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink\ -DarchetypeArtifactId=flink-quickstart-java\ -DarchetypeVersion=1.3.1 \ -DgroupId=org.apache.flink.quickstart \ -DartifactId=flink-java-project\ -Dversion=0.1 \ -Dpackage=org.apache.flink.quickstart \ -DinteractiveMode=false * add flink-training-exercises 0.10.0 dependency com.data-artisans flink-training-exercises 0.10.0 * implement the task (http://training.data-artisans.com/exercises/ rideCleansing.html) * /flink run -c org.apache.flink.quickstart.TaxiStreamCleansingJob ./flink-java-project/target/ flink-java-project-0.1.jar What I noticed though is that my dependency tree only contains joda- time-2.7.jar not 2.9.9 as in your case - did you change the dependencies somehow? mvn clean package ... INFO] Including joda-time:joda-time:jar:2.7 in the shaded jar. ... Could you try with a new development project set up the way above and copy your code into this? If that doesn't help, try with a freshly extracted unaltered flink archive. Nico On Sunday, 9 July 2017 17:32:25 CEST Günter Hipler wrote: Thanks for response. My classpath contains a version mvn dependency:build-classpath [INFO] Scanning for projects... [INFO] [INFO] [INFO] Building Flink Quickstart Job 0.1 [INFO] [INFO] [INFO] --- maven-dependency-plugin:2.8:build-classpath (default-cli) @ flink-java-project --- [INFO] Dependencies classpath: togram/2.1.6/HdrHistogram-2.1.6.jar:/home/swissbib/.m2/repository/com/twitte r/jsr166e/1.1.0/jsr166e-1.1.0.jar:/home/swissbib/.m2/repository/joda-time/jo da-time/2.9.9/joda-time-2.9.9.jar: which contains definitely the required method. (http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormatte r.html#withZoneUTC--) Something else is going wrong. I guess the way how I started (or configured) the local cluster (but it's done as described in the training setup (http://training.data-artisans.com/devEnvSetup.html) - which is very straightforward. Günter On 09.07.2017 16:17, Ted Yu wrote: Since the exception was about a missing method (withZoneUTC) instead of class not found, it was likely due to a conflicting joda time jar being on the classpath. Cheers On Sun, Jul 9, 2017 at 1:22 AM, Günter Hipler <guenter.hip...@unibas.ch <mailto:guenter.hip...@unibas.ch>> wrote: Hi, sorry for this newbie question... I'm following the data artisans exercises and wanted to run the TaxiRide Cleansing job on my local cluster (version 1.3.1) (http://training.data-artisans.com/exercises/rideCleansing.html <http://training.data-artisans.com/exercises/rideCleansing.html>) While this is possible within my IDE the cluster throws an exception because of a missing type although the missed type is part of the application jar the cluster is provided with. swissbib@ub-sbhp02:~/environment/code/flink_einarbeitung/training/flin k-java-project/target$ jar tf flink-java-project-0.1.jar | grep DateTimeFormatter org/elasticsearch/common/joda/FormatDateTimeFormatter.class org/joda/time/format/DateTimeFormatter.class org/joda/time/format/DateTimeFormatterBuilder$CharacterLiteral.class org/joda/time/format/DateTimeFormatterBuilder$Composite.class org/joda/time/format/DateTimeFormatterBuilder$FixedNumber.class org/joda/time/format/DateTimeFormatterBuilder$Fraction.class org/joda/time/format/DateTimeFormatterBuilder$MatchingParser.class org/joda/time/format/DateTimeFormatterBuilder$NumberFormatter.class org/joda/time/format/DateTimeFormatterBuilder$PaddedNumber.class org/joda/time/format/Da
Re: [POLL] Dropping savepoint format compatibility for 1.1.x in the Flink 1.4.0 release
+1 to drop Java 7 support On 12.07.2017 16:43, Stephan Ewen wrote: Hi users! Flink currently maintains backwards compatibility for savepoint formats, which means that savepoints taken with Flink version 1.1.x and 1.2.x can be resumed in Flink 1.3.x We are discussing how many versions back to support. The proposition is the following: * Suggestion: Flink 1.4.0 will be able to resume savepoints taken with version 1.3.x and 1.2.x, but not savepoints from version 1.1.x and 1.0.x* The reason for that is that there is a lot of code mapping between the completely different legacy format (1.1.x, not re-scalable) and the key-group-oriented format (1.2.x onwards, re-scalable). It would greatly help the development of state and checkpointing features to drop that old code. Please let us know if you have concerns about that. Best, Stephan -- Universität Basel Universitätsbibliothek Günter Hipler Projekt SwissBib Schoenbeinstrasse 18-20 4056 Basel, Schweiz Tel.: + 41 (0)61 267 31 12 Fax: ++41 61 267 3103 E-Mail guenter.hip...@unibas.ch URL: www.swissbib.org / http://www.ub.unibas.ch/
Re: problems starting the training exercise TaxiRideCleansing on local cluster
Thanks for response. My classpath contains a version mvn dependency:build-classpath [INFO] Scanning for projects... [INFO] [INFO] [INFO] Building Flink Quickstart Job 0.1 [INFO] [INFO] [INFO] --- maven-dependency-plugin:2.8:build-classpath (default-cli) @ flink-java-project --- [INFO] Dependencies classpath: togram/2.1.6/HdrHistogram-2.1.6.jar:/home/swissbib/.m2/repository/com/twitter/jsr166e/1.1.0/jsr166e-1.1.0.jar:/home/swissbib/.m2/repository/joda-time/joda-time/2.9.9/joda-time-2.9.9.jar: which contains definitely the required method. (http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormatter.html#withZoneUTC--) Something else is going wrong. I guess the way how I started (or configured) the local cluster (but it's done as described in the training setup (http://training.data-artisans.com/devEnvSetup.html) - which is very straightforward. Günter On 09.07.2017 16:17, Ted Yu wrote: Since the exception was about a missing method (withZoneUTC) instead of class not found, it was likely due to a conflicting joda time jar being on the classpath. Cheers On Sun, Jul 9, 2017 at 1:22 AM, Günter Hipler <guenter.hip...@unibas.ch <mailto:guenter.hip...@unibas.ch>> wrote: Hi, sorry for this newbie question... I'm following the data artisans exercises and wanted to run the TaxiRide Cleansing job on my local cluster (version 1.3.1) (http://training.data-artisans.com/exercises/rideCleansing.html <http://training.data-artisans.com/exercises/rideCleansing.html>) While this is possible within my IDE the cluster throws an exception because of a missing type although the missed type is part of the application jar the cluster is provided with. swissbib@ub-sbhp02:~/environment/code/flink_einarbeitung/training/flink-java-project/target$ jar tf flink-java-project-0.1.jar | grep DateTimeFormatter org/elasticsearch/common/joda/FormatDateTimeFormatter.class org/joda/time/format/DateTimeFormatter.class org/joda/time/format/DateTimeFormatterBuilder$CharacterLiteral.class org/joda/time/format/DateTimeFormatterBuilder$Composite.class org/joda/time/format/DateTimeFormatterBuilder$FixedNumber.class org/joda/time/format/DateTimeFormatterBuilder$Fraction.class org/joda/time/format/DateTimeFormatterBuilder$MatchingParser.class org/joda/time/format/DateTimeFormatterBuilder$NumberFormatter.class org/joda/time/format/DateTimeFormatterBuilder$PaddedNumber.class org/joda/time/format/DateTimeFormatterBuilder$StringLiteral.class org/joda/time/format/DateTimeFormatterBuilder$TextField.class org/joda/time/format/DateTimeFormatterBuilder$TimeZoneId.class org/joda/time/format/DateTimeFormatterBuilder$TimeZoneName.class org/joda/time/format/DateTimeFormatterBuilder$TimeZoneOffset.class org/joda/time/format/DateTimeFormatterBuilder$TwoDigitYear.class org/joda/time/format/DateTimeFormatterBuilder$UnpaddedNumber.class org/joda/time/format/DateTimeFormatterBuilder.class Any advice? Thanks! Günter swissbib@ub-sbhp02:/usr/local/swissbib/flink$ bin/flink run -c org.apache.flink.quickstart.St <http://org.apache.flink.quickstart.St>reamingJob /home/swissbib/environment/code/flink_einarbeitung/training/flink-java-project/target/flink-java-project-0.1.jar --input /home/swissbib/environment/code/flink_einarbeitung/training/flink-java-project/data/nycTaxiRides.gz SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/local/swissbib/flink-1.3.1/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/swissbib/environment/tools/hbase-1.2.1/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/home/swissbib/environment/tools/hadoop-2.5.1/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings <http://www.slf4j.org/codes.html#multiple_bindings> for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123 <http://127.0.0.1:6123> Using address localhost:6123 to connect to JobManager. JobManager web interface address http://localhost:8081 Starting execution of program Submitting job with JobID: 32c7f2d0bbcac4d8c0367639ea928014. Waiting for job completion. Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1464375722] with leader session id ----. 07/09/2017 09:31:51Job execution switched to
problems starting the training exercise TaxiRideCleansing on local cluster
l Universitätsbibliothek Günter Hipler Projekt SwissBib Schoenbeinstrasse 18-20 4056 Basel, Schweiz Tel.: + 41 (0)61 267 31 12 Fax: ++41 61 267 3103 E-Mail guenter.hip...@unibas.ch URL: www.swissbib.org / http://www.ub.unibas.ch/