no suitable table factory for file sources

2020-02-02 Thread Günter Hipler

Hi,

I can't read from a file source using the sql-client tool.

I just set up a simple test scenario with the configuration file in [1]

I'm getting the error in [2] starting the environment with
bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
in the standard Flink 1.9.1 download environment.

Reading the documentation 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors 
I expected the file system connectors as "built in"


I'm getting a similar issue while following the sql-training tutorial 
from https://github.com/ververica/sql-training/wiki. There I changed the 
used sql-client-conf.yaml file of the docker container for the sql client.


Thanks for any hints

Günter




[1]


# Define table sources here. See the Table API & SQL documentation for 
details.


tables:

  - name: Guenter
    type: source-table
    update-mode: append
    connector:
  type: filesystem
  path: file:///home/swissbib/temp/trash/hello.txt


    format:
  type: csv
  # required: define the schema either by using type information
  schema: "ROW(test STRING)"

  # or use the table's schema
  derive-schema: true

  field-delimiter: ";" # optional: field delimiter 
character (',' by default)
  line-delimiter: "\r\n"   # optional: line delimiter ("\n" by 
default; otherwise "\r" or "\r\n" are allowed)
  quote-character: "'" # optional: quote character for 
enclosing field values ('"' by default)
  allow-comments: true # optional: ignores comment lines 
that start with "#" (disabled by default)
  #   if enabled, make sure to also ignore parse errors to allow 
empty rows
  ignore-parse-errors: true    # optional: skip fields and rows 
with parse errors instead of failing;

  #   fields are set to null in case of errors
  array-element-delimiter: "|" # optional: the array element 
delimiter string for separating

  #   array and row element values (";" by default)
  escape-character: "\\"   # optional: escape character for 
escaping values (disabled by default)
  null-literal: "n/a"  # optional: null literal string that 
is interpreted as a

  #   null value (disabled by default)



#==
# Execution properties
#==

# Execution properties allow for changing the behavior of a table program.

execution:
  #planner: blink
  type: streaming  # 'batch' or 'streaming' execution
  result-mode: table   # 'changelog' or 'table' presentation of 
results

  parallelism: 1   # parallelism of the program
  max-parallelism: 128 # maximum parallelism
  min-idle-state-retention: 0  # minimum idle state retention in ms
  max-idle-state-retention: 0  # maximum idle state retention in ms

#==
# Deployment properties
#==

# Deployment properties allow for describing the cluster to which table
# programs are submitted to.

deployment:
  type: standalone # only the 'standalone' deployment is 
supported
  response-timeout: 5000   # general cluster communication timeout 
in ms

  gateway-address: ""  # (optional) address from cluster to gateway
  gateway-port: 0  # (optional) port from cluster to gateway

[2]

bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
Reading default environment from: 
file:/usr/local/swissbib/flink-1.9.1/gh/sql-client-conf.yaml

No session environment specified.
Validating current environment...

Exception in thread "main" 
org.apache.flink.table.client.SqlClientException: The configured 
environment is invalid. Please check your environment files again.
    at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)

    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
    at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: 
Could not create execution context.
    at 
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562)
    at 
org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
    at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)

    ... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: 
Could not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in

the classpath.

Reason: No factory supports all properties.

The following properties are requested:

Re: configuration of standalone cluster

2019-05-02 Thread Günter Hipler

swissbib@sb-ust1:~$ java -version
openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment (build 11.0.2+9-Ubuntu-3ubuntu118.04.3)
OpenJDK 64-Bit Server VM (build 11.0.2+9-Ubuntu-3ubuntu118.04.3, mixed 
mode, sharing)

swissbib@sb-ust1:~$

Is version 8 more appropriate?

Günter


On 02.05.19 13:48, Chesnay Schepler wrote:

Which java version are you using?

On 01/05/2019 21:31, Günter Hipler wrote:

Hi,

For the first time I'm trying to set up a standalone cluster. My 
current configuration

4 server (1 jobmanger and 3 taskmanager)

a) starting the cluster
swissbib@sb-ust1:/swissbib_index/apps/flink/bin$ ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host sb-ust1.
Starting taskexecutor daemon on host sb-ust2.
Starting taskexecutor daemon on host sb-ust3.
Starting taskexecutor daemon on host sb-ust4.


On the taskmanager side I get the error
2019-05-01 21:16:32,794 WARN 
akka.remote.ReliableDeliverySupervisor    - 
Association with remote system [akka.ssl.tcp://flink@sb-ust1:6123] 
has failed, address is now gated for [50] ms. Reason: [class [B 
cannot be cast to class [C ([B and [C are in module java.base of 
loader 'bootstrap')]
2019-05-01 21:16:41,932 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor    - Could 
not resolve ResourceManager address 
akka.ssl.tcp://flink@sb-ust1:6123/user/resourcemanager, retrying in 
1 ms: Ask timed out on 
[ActorSelection[Anchor(akka.ssl.tcp://flink@sb-ust1:6123/), 
Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent 
message of type "akka.actor.Identify"..
2019-05-01 21:17:01,960 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor    - Could 
not resolve ResourceManager address 
akka.ssl.tcp://flink@sb-ust1:6123/user/resourcemanager, retrying in 
1 ms: Ask timed out on 
[ActorSelection[Anchor(akka.ssl.tcp://flink@sb-ust1:6123/), 
Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent 
message of type "akka.actor.Identify"..



port 6123 is allowed on the jobmanager but I haven't created a 
specialized flink - user.


- Is this necessary? if yes, is it possible to define another user 
for communication purposes?


I followed the documentation to setup a ssl based communication 
(https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/security-ssl.html#example-ssl-setup-standalone-and-kubernetes) 
and created a keystore as described:


keytool -genkeypair -alias swissbib.internal -keystore 
internal.keystore -dname "CN=flink.internal" -storepass verysecret 
-keypass verysecret -keyalg RSA -keysize 4096


and deployed the flink-conf.yaml on the whole cluster

(part of flink-conf.yaml)
security.ssl.internal.enabled: true
security.ssl.internal.keystore: 
/swissbib_index/apps/flink/conf/internal.keystore
security.ssl.internal.truststore: 
/swissbib_index/apps/flink/conf/internal.keystore

security.ssl.internal.keystore-password: verysecret
security.ssl.internal.truststore-password: verysecret
security.ssl.internal.key-password: verysecret

but this doesn't solve the problem - still no connection between 
task-managers and job-managers.


- another question: which ports have to be enabled in the firewall 
for a standalone cluster?


Thanks for any hints!

Günter







configuration of standalone cluster

2019-05-01 Thread Günter Hipler

Hi,

For the first time I'm trying to set up a standalone cluster. My current 
configuration

4 server (1 jobmanger and 3 taskmanager)

a) starting the cluster
swissbib@sb-ust1:/swissbib_index/apps/flink/bin$ ./start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host sb-ust1.
Starting taskexecutor daemon on host sb-ust2.
Starting taskexecutor daemon on host sb-ust3.
Starting taskexecutor daemon on host sb-ust4.


On the taskmanager side I get the error
2019-05-01 21:16:32,794 WARN 
akka.remote.ReliableDeliverySupervisor    - 
Association with remote system [akka.ssl.tcp://flink@sb-ust1:6123] has 
failed, address is now gated for [50] ms. Reason: [class [B cannot be 
cast to class [C ([B and [C are in module java.base of loader 'bootstrap')]
2019-05-01 21:16:41,932 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor    - Could 
not resolve ResourceManager address 
akka.ssl.tcp://flink@sb-ust1:6123/user/resourcemanager, retrying in 
1 ms: Ask timed out on 
[ActorSelection[Anchor(akka.ssl.tcp://flink@sb-ust1:6123/), 
Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent 
message of type "akka.actor.Identify"..
2019-05-01 21:17:01,960 INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor    - Could 
not resolve ResourceManager address 
akka.ssl.tcp://flink@sb-ust1:6123/user/resourcemanager, retrying in 
1 ms: Ask timed out on 
[ActorSelection[Anchor(akka.ssl.tcp://flink@sb-ust1:6123/), 
Path(/user/resourcemanager)]] after [1 ms]. Sender[null] sent 
message of type "akka.actor.Identify"..



port 6123 is allowed on the jobmanager but I haven't created a 
specialized flink - user.


- Is this necessary? if yes, is it possible to define another user for 
communication purposes?


I followed the documentation to setup a ssl based communication 
(https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/security-ssl.html#example-ssl-setup-standalone-and-kubernetes) 
and created a keystore as described:


keytool -genkeypair -alias swissbib.internal -keystore internal.keystore 
-dname "CN=flink.internal" -storepass verysecret -keypass verysecret 
-keyalg RSA -keysize 4096


and deployed the flink-conf.yaml on the whole cluster

(part of flink-conf.yaml)
security.ssl.internal.enabled: true
security.ssl.internal.keystore: 
/swissbib_index/apps/flink/conf/internal.keystore
security.ssl.internal.truststore: 
/swissbib_index/apps/flink/conf/internal.keystore

security.ssl.internal.keystore-password: verysecret
security.ssl.internal.truststore-password: verysecret
security.ssl.internal.key-password: verysecret

but this doesn't solve the problem - still no connection between 
task-managers and job-managers.


- another question: which ports have to be enabled in the firewall for a 
standalone cluster?


Thanks for any hints!

Günter



Re: problems starting the training exercise TaxiRideCleansing on local cluster

2017-07-23 Thread Günter Hipler

Hi Nico,

thanks for looking into it. The reason for the behavior on my system: I 
had two different jdk versions installed (openjdk and oracle jdk) - I 
wasn't aware of because I prefer to use generally the oracle jdk. 
Somehow, I didn't analyze at greater depth, both versions were used in 
different scenarios which seemed to cause the error. After removing 
openjdk completely from my system I could use the local flink cluster 
with my application jar.


Sorry for the inconvenience!

Günter


On 10.07.2017 15:43, Nico Kruber wrote:

Hi Günter,
unfortunately, I cannot reproduce your error. This is what I did (following
http://training.data-artisans.com/devEnvSetup.html):

* clone and build the flink-training-exercises project:
git clone https://github.com/dataArtisans/flink-training-exercises.git
cd flink-training-exercises
mvn clean install

* downloaded & extracted flink 1.3.1 (hadoop 2.7, Scala 2.10 - but that should
not matter)
* /bin/start-local.sh

* create a development project:
mvn archetype:generate \
 -DarchetypeGroupId=org.apache.flink\
 -DarchetypeArtifactId=flink-quickstart-java\
 -DarchetypeVersion=1.3.1   \
 -DgroupId=org.apache.flink.quickstart  \
 -DartifactId=flink-java-project\
 -Dversion=0.1  \
 -Dpackage=org.apache.flink.quickstart  \
 -DinteractiveMode=false
* add flink-training-exercises 0.10.0 dependency

   com.data-artisans
   flink-training-exercises
   0.10.0

* implement the task (http://training.data-artisans.com/exercises/
rideCleansing.html)
* /flink run -c
org.apache.flink.quickstart.TaxiStreamCleansingJob ./flink-java-project/target/
flink-java-project-0.1.jar

What I noticed though is that my dependency tree only contains joda-
time-2.7.jar not 2.9.9 as in your case - did you change the dependencies
somehow?
mvn clean package
...
INFO] Including joda-time:joda-time:jar:2.7 in the shaded jar.
...

Could you try with a new development project set up the way above and copy
your code into this?

If that doesn't help, try with a freshly extracted unaltered flink archive.


Nico


On Sunday, 9 July 2017 17:32:25 CEST Günter Hipler wrote:

Thanks for response.

My classpath contains a version

   mvn dependency:build-classpath
[INFO] Scanning for projects...
[INFO]
[INFO]

[INFO] Building Flink Quickstart Job 0.1
[INFO]

[INFO]
[INFO] --- maven-dependency-plugin:2.8:build-classpath (default-cli) @
flink-java-project ---
[INFO] Dependencies classpath:


togram/2.1.6/HdrHistogram-2.1.6.jar:/home/swissbib/.m2/repository/com/twitte
r/jsr166e/1.1.0/jsr166e-1.1.0.jar:/home/swissbib/.m2/repository/joda-time/jo
da-time/2.9.9/joda-time-2.9.9.jar:



which contains definitely the required method.
(http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormatte
r.html#withZoneUTC--)

Something else is going wrong. I guess the way how I started (or
configured) the local cluster (but it's done as described in the
training setup (http://training.data-artisans.com/devEnvSetup.html) -
which is very straightforward.

Günter

On 09.07.2017 16:17, Ted Yu wrote:

Since the exception was about a missing method (withZoneUTC) instead
of class not found, it was likely due to a conflicting joda time jar
being on the classpath.

Cheers

On Sun, Jul 9, 2017 at 1:22 AM, Günter Hipler

<guenter.hip...@unibas.ch <mailto:guenter.hip...@unibas.ch>> wrote:
 Hi,
 
 sorry for this newbie question...
 
 I'm following the data artisans exercises and wanted to run the

 TaxiRide Cleansing job on my local cluster (version 1.3.1)
 (http://training.data-artisans.com/exercises/rideCleansing.html
 <http://training.data-artisans.com/exercises/rideCleansing.html>)
 
 While this is possible within my IDE the cluster throws an

 exception because of a missing type although the missed type is
 part of the application jar the cluster is provided with.
 
 swissbib@ub-sbhp02:~/environment/code/flink_einarbeitung/training/flin

 k-java-project/target$ jar tf flink-java-project-0.1.jar | grep
 DateTimeFormatter
 org/elasticsearch/common/joda/FormatDateTimeFormatter.class
 org/joda/time/format/DateTimeFormatter.class
 org/joda/time/format/DateTimeFormatterBuilder$CharacterLiteral.class
 org/joda/time/format/DateTimeFormatterBuilder$Composite.class
 org/joda/time/format/DateTimeFormatterBuilder$FixedNumber.class
 org/joda/time/format/DateTimeFormatterBuilder$Fraction.class
 org/joda/time/format/DateTimeFormatterBuilder$MatchingParser.class
 org/joda/time/format/DateTimeFormatterBuilder$NumberFormatter.class
 org/joda/time/format/DateTimeFormatterBuilder$PaddedNumber.class
 org/joda/time/format/Da

Re: [POLL] Dropping savepoint format compatibility for 1.1.x in the Flink 1.4.0 release

2017-07-12 Thread Günter Hipler

+1 to drop Java 7 support


On 12.07.2017 16:43, Stephan Ewen wrote:

Hi users!

Flink currently maintains backwards compatibility for savepoint 
formats, which means that savepoints taken with Flink version 1.1.x 
and 1.2.x can be resumed in Flink 1.3.x


We are discussing how many versions back to support. The proposition 
is the following:


*   Suggestion: Flink 1.4.0 will be able to resume savepoints taken 
with version 1.3.x and 1.2.x, but not savepoints from version 1.1.x 
and 1.0.x*



The reason for that is that there is a lot of code mapping between the 
completely different legacy format (1.1.x, not re-scalable) and the 
key-group-oriented format (1.2.x onwards, re-scalable). It would 
greatly help the development of state and checkpointing features to 
drop that old code.


Please let us know if you have concerns about that.

Best,
Stephan



--
Universität Basel
Universitätsbibliothek
Günter Hipler
Projekt SwissBib
Schoenbeinstrasse 18-20
4056 Basel, Schweiz
Tel.: + 41 (0)61 267 31 12 Fax: ++41 61 267 3103
E-Mail guenter.hip...@unibas.ch
URL: www.swissbib.org  / http://www.ub.unibas.ch/



Re: problems starting the training exercise TaxiRideCleansing on local cluster

2017-07-09 Thread Günter Hipler

Thanks for response.

My classpath contains a version

 mvn dependency:build-classpath
[INFO] Scanning for projects...
[INFO]
[INFO] 


[INFO] Building Flink Quickstart Job 0.1
[INFO] 


[INFO]
[INFO] --- maven-dependency-plugin:2.8:build-classpath (default-cli) @ 
flink-java-project ---

[INFO] Dependencies classpath:


togram/2.1.6/HdrHistogram-2.1.6.jar:/home/swissbib/.m2/repository/com/twitter/jsr166e/1.1.0/jsr166e-1.1.0.jar:/home/swissbib/.m2/repository/joda-time/joda-time/2.9.9/joda-time-2.9.9.jar:



which contains definitely the required method. 
(http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormatter.html#withZoneUTC--)


Something else is going wrong. I guess the way how I started (or 
configured) the local cluster (but it's done as described in the 
training setup (http://training.data-artisans.com/devEnvSetup.html) - 
which is very straightforward.


Günter


On 09.07.2017 16:17, Ted Yu wrote:
Since the exception was about a missing method (withZoneUTC) instead 
of class not found, it was likely due to a conflicting joda time jar 
being on the classpath.


Cheers

On Sun, Jul 9, 2017 at 1:22 AM, Günter Hipler 
<guenter.hip...@unibas.ch <mailto:guenter.hip...@unibas.ch>> wrote:


Hi,

sorry for this newbie question...

I'm following the data artisans exercises and wanted to run the
TaxiRide Cleansing job on my local cluster (version 1.3.1)
(http://training.data-artisans.com/exercises/rideCleansing.html
<http://training.data-artisans.com/exercises/rideCleansing.html>)

While this is possible within my IDE the cluster throws an
exception because of a missing type although the missed type is
part of the application jar the cluster is provided with.


swissbib@ub-sbhp02:~/environment/code/flink_einarbeitung/training/flink-java-project/target$
jar tf flink-java-project-0.1.jar | grep DateTimeFormatter
org/elasticsearch/common/joda/FormatDateTimeFormatter.class
org/joda/time/format/DateTimeFormatter.class
org/joda/time/format/DateTimeFormatterBuilder$CharacterLiteral.class
org/joda/time/format/DateTimeFormatterBuilder$Composite.class
org/joda/time/format/DateTimeFormatterBuilder$FixedNumber.class
org/joda/time/format/DateTimeFormatterBuilder$Fraction.class
org/joda/time/format/DateTimeFormatterBuilder$MatchingParser.class
org/joda/time/format/DateTimeFormatterBuilder$NumberFormatter.class
org/joda/time/format/DateTimeFormatterBuilder$PaddedNumber.class
org/joda/time/format/DateTimeFormatterBuilder$StringLiteral.class
org/joda/time/format/DateTimeFormatterBuilder$TextField.class
org/joda/time/format/DateTimeFormatterBuilder$TimeZoneId.class
org/joda/time/format/DateTimeFormatterBuilder$TimeZoneName.class
org/joda/time/format/DateTimeFormatterBuilder$TimeZoneOffset.class
org/joda/time/format/DateTimeFormatterBuilder$TwoDigitYear.class
org/joda/time/format/DateTimeFormatterBuilder$UnpaddedNumber.class
org/joda/time/format/DateTimeFormatterBuilder.class

Any advice? Thanks!

Günter


swissbib@ub-sbhp02:/usr/local/swissbib/flink$ bin/flink run -c
org.apache.flink.quickstart.St
<http://org.apache.flink.quickstart.St>reamingJob

/home/swissbib/environment/code/flink_einarbeitung/training/flink-java-project/target/flink-java-project-0.1.jar
--input

/home/swissbib/environment/code/flink_einarbeitung/training/flink-java-project/data/nycTaxiRides.gz

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in

[jar:file:/usr/local/swissbib/flink-1.3.1/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in

[jar:file:/home/swissbib/environment/tools/hbase-1.2.1/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in

[jar:file:/home/swissbib/environment/tools/hadoop-2.5.1/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings
<http://www.slf4j.org/codes.html#multiple_bindings> for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Cluster configuration: Standalone cluster with JobManager at
localhost/127.0.0.1:6123 <http://127.0.0.1:6123>
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8081
Starting execution of program
Submitting job with JobID: 32c7f2d0bbcac4d8c0367639ea928014.
Waiting for job completion.
Connected to JobManager at
Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1464375722]
with leader session id ----.
07/09/2017 09:31:51Job execution switched to 

problems starting the training exercise TaxiRideCleansing on local cluster

2017-07-09 Thread Günter Hipler
l
Universitätsbibliothek
Günter Hipler
Projekt SwissBib
Schoenbeinstrasse 18-20
4056 Basel, Schweiz
Tel.: + 41 (0)61 267 31 12 Fax: ++41 61 267 3103
E-Mail guenter.hip...@unibas.ch
URL: www.swissbib.org  / http://www.ub.unibas.ch/