Split Stream on a Split Stream

2019-02-26 Thread Taher Koitawala
Hi All,
  We are currently working with Flink 1.7.2 version and we are get
the below given exception when doing a split on a split.

SplitStreamsplitStream=stream1.split(new SomeSplitLogic());

DataStream select1=splitStream.select("1");
DataStream select2=splitStream.select("2");


select2.split(new AnotherSplitLogic()).select("3");


Basically the exception is recommending to use SideOutput, however the only
way I see to get a side output is by using a process function. Can someone
suggest a better way of doing this?

Exception :
Caused by: java.lang.IllegalStateException:  Consecutive multiple splits
are not supported. Splits are deprecated. Please use side-outputs


Regards,
Taher Koitawala
GS Lab Pune
+91 8407979163


submit job failed on Yarn HA

2019-02-26 Thread 孙森
Hi all:

I run flink (1.5.1 with hadoop 2.7) on yarn ,and submit job by 
“/usr/local/flink/bin/flink run -m jmhost:port my.jar”, but the submission is 
failed.
The HA configuration is :
 high-availability: zookeeper
 high-availability.storageDir: hdfs:///flink/ha/
 high-availability.zookeeper.quorum:  hdp1:2181,hdp2:2181,hdp3:2181
 yarn.application-attempts: 2
The info showed int the client log:

2019-02-27 11:48:38,651 INFO  org.apache.flink.runtime.rest.RestClient  
- Shutting down rest endpoint.
2019-02-27 11:48:38,659 INFO  org.apache.flink.runtime.rest.RestClient  
- Rest endpoint shutdown complete.
2019-02-27 11:48:38,662 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService /leader/rest_server_lock.
2019-02-27 11:48:38,665 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService /leader/dispatcher_lock.
2019-02-27 11:48:38,670 INFO  
org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
  - backgroundOperationsLoop exiting
2019-02-27 11:48:38,689 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session: 
0x2679c52880c00ee closed
2019-02-27 11:48:38,689 INFO  
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - 
EventThread shut down for session: 0x2679c52880c00ee
2019-02-27 11:48:38,690 ERROR org.apache.flink.client.cli.CliFrontend   
- Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result.
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:257)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 

Re: flink list and flink run commands timeout

2019-02-26 Thread sen
Hi Aneesha:

  I am also facing the same problem.When I turn on the HA on yarn ,it
will get the same exception. While I turn off the Ha configuration ,it works
fine.
  I want to know that what  did  you do to deal with the problem?

Thanks!
Sen Sun



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Errors running user jars with flink in docker flink x

2019-02-26 Thread Paroma Sengupta
> Hi,
> I am trying to run my flink application through docker containers. For
> that I made use of the code present over here flink_docker
> .
> However when I try to run the docker image, it fails with this
> error message
>


> ```WARN org.apache.flink.client.cli.CliFrontend - Could not load CLI class
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.
> java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1238)
> at
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1183)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1109)
> Caused by: java.lang.NoSuchFieldError: MODE
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.(FlinkYarnSessionCli.java:185)
> at
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.(FlinkYarnSessionCli.java:172)
> ... 7 more
> Exception in thread "main" java.lang.NoSuchFieldError: MODE
> at
> org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1192)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1109)```
>
> Any pointers for solving this issue will be thoroughly appreciated.
>

*the Docker file has the following contents:*

FROM java:8-jre-alpine

# Install requirements
RUN apk add --no-cache bash snappy

# Flink environment variables
ENV FLINK_INSTALL_PATH=/opt
ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
ENV FLINK_LIB_DIR $FLINK_HOME/lib
ENV PATH $PATH:$FLINK_HOME/bin

# flink-dist can point to a directory or a tarball on the local system
ARG flink_dist=NOT_SET
ARG job_jar=NOT_SET

# Install build dependencies and flink
ADD $flink_dist $FLINK_INSTALL_PATH
ADD $job_jar $FLINK_INSTALL_PATH/job.jar

RUN set -x && \
  ln -s $FLINK_INSTALL_PATH/flink-* $FLINK_HOME && \
  ln -s $FLINK_INSTALL_PATH/job.jar $FLINK_LIB_DIR && \
  addgroup -S flink && adduser -D -S -H -G flink -h $FLINK_HOME flink && \
  chown -R flink:flink $FLINK_INSTALL_PATH/flink-* && \
  chown -h flink:flink $FLINK_HOME

COPY docker-entrypoint.sh /

USER flink
EXPOSE 8081 6123
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["--help"]

*The contents of the docker-entrypoint.sh:*
### If unspecified, the hostname of the container is taken as the
JobManager address
FLINK_HOME=${FLINK_HOME:-"/opt/flink/bin"}
#FLINK_INSTALL_PATH=${FLINK_INSTALL_PATH:-"/opt"}

JOB_CLUSTER="job-cluster"
TASK_MANAGER="task-manager"
RUN_FATJAR="run-fatJar"
CMD="$1"
FATJAR="$2"
shift;

if [ "${CMD}" == "--help" -o "${CMD}" == "-h" ]; then
echo "Usage: $(basename $0)
(${JOB_CLUSTER}|${TASK_MANAGER}|${RUN_FATJAR})"

elif [ "${CMD}" == "${JOB_CLUSTER}" -o "${CMD}" == "${TASK_MANAGER}" -o
"${CMD}" == "${RUN_FATJAR}" ]; then
echo "Starting the ${CMD}"

if [ "${CMD}" == "${TASK_MANAGER}" ]; then
exec $FLINK_HOME/bin/taskmanager.sh start-foreground "$@"
elif [ "${CMD}" == "${RUN_FATJAR}" ]; then
echo "Starting fatJar at $FLINK_HOME/lib"
exec $FLINK_HOME/bin/flink run $FLINK_HOME/lib
else
exec $FLINK_HOME/bin/standalone-job.sh start-foreground "$@"
fi
else
echo  "flink home ${FLINK_HOME}"
echo "the command ${CMD} does not exist"
fi

#exec "$@"
*The command I am running is:*
docker run  run-fatJar




>
> Sincerely,
> Paroma
>


Re: 试用BlinkSQL发现一些问题

2019-02-26 Thread bigdatayunzhongyan
Sorry!

1、SQL解析有问题,无法识别formatted,desc xxx可以
Flink SQL> desc formatted customer;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Table 'formatted customer' was not 
found.

Flink SQL> desc customer;
root
 |-- name: c_customer_sk
 |-- type: IntType
 |-- isNullable: true

2、不支持hive外部表
Flink SQL> select * from customer limit 10;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.BatchTableSourceFactory' in
the classpath.

Reason: 
The matching factory 
'org.apache.flink.streaming.connectors.hive.HiveTableFactory' doesn't support 
'external'.

3、Hive1.x版本兼容问题 desc 
 Invalid method name: 'get_table_req'


谢谢!
发件人: Jark Wu
发送时间: 2019-02-27 12:15:27
收件人:  user-zh@flink.apache.org
主题: Re: 试用BlinkSQL发现一些问题
Hi,

邮件列表不支持发图片,可以发图片的链接。

Thanks,
Jark

On Wed, 27 Feb 2019 at 11:17, bigdatayunzhongyan
 wrote:

>
> 大家好!
> 在试用BlinkSQL中发现一些问题,总结如下:
> Hive版本2.3.4
> 1、SQL解析有问题 无法识别formatted
>
> 2、不支持hive外部表
> 3、Hive1.x版本兼容问题
>
>


Errors running user jars with flink in docker flink x

2019-02-26 Thread Paroma Sengupta
Hi,
I am trying to run my flink application through docker containers. For that
I made use of the code present over here flink_docker
.
However when I try to run the docker image, it fails with this
error message ```WARN org.apache.flink.client.cli.CliFrontend - Could not
load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1238)
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1183)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1109)
Caused by: java.lang.NoSuchFieldError: MODE
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.(FlinkYarnSessionCli.java:185)
at
org.apache.flink.yarn.cli.FlinkYarnSessionCli.(FlinkYarnSessionCli.java:172)
... 7 more
Exception in thread "main" java.lang.NoSuchFieldError: MODE
at
org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1192)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1109)```

Any pointers for solving this issue will be thoroughly appreciated.

Sincerely,
Paroma


Re: left join failing with FlinkLogicalJoinConverter NPE

2019-02-26 Thread Xingcan Cui
Hi Karl,

I think this is a bug and created FLINK-11769 
 to track it.

Best,
Xingcan

> On Feb 26, 2019, at 2:02 PM, Karl Jin  wrote:
> 
> I removed the multiset> field and the join worked fine. 
> The field was created from a Kafka source through a query that looks like 
> "select collect(data) as i_data from ... group by pk"
> 
> Do you think this is a bug or is this something I can get around using some 
> configuration?
> 
> On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui  > wrote:
> Yes. Please check that. If it's the nested type's problem, this might be a 
> bug.
> 
> On Mon, Feb 25, 2019, 21:50 Karl Jin  > wrote:
> Do you think something funky might be happening with Map/Multiset types? If 
> so how do I deal with it (I think I can verify by removing those columns and 
> retry?)?
> 
> On Mon, Feb 25, 2019 at 6:28 PM Karl Jin  > wrote:
> Thanks for checking in quickly,
> 
> Below is what I got on printSchema on the two tables (left joining the second 
> one to the first one on uc_pk = i_uc_pk). rowtime in both are extracted from 
> the string field uc_update_ts
> 
> root
>  |-- uc_pk: String
>  |-- uc_update_ts: String
>  |-- rowtime: TimeIndicatorTypeInfo(rowtime)
>  |-- uc_version: String
>  |-- uc_type: String
>  |-- data_parsed: Map
> 
> root
>  |-- i_uc_pk: String
>  |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
>  |-- image_count: Long
>  |-- i_data: Multiset>
> 
> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui  > wrote:
> Hi Karl,
> 
> It seems that some field types of your inputs were not properly extracted. 
> Could you share the result of `printSchema()` for your input tables?
> 
> Best,
> Xingcan
> 
> > On Feb 25, 2019, at 4:35 PM, Karl Jin  > > wrote:
> > 
> > Hello,
> > 
> > First time posting, so please let me know if the formatting isn't correct, 
> > etc.
> > 
> > I'm trying to left join two Kafka sources, running 1.7.2 locally, but 
> > getting the below exception. Looks like some sort of query optimization 
> > process but I'm not sure where to start investigating/debugging. I see 
> > things are marked as NONE in the object so that's a bit of a flag to me, 
> > although I don't know for sure. Any pointer would be much appreciated:
> > 
> > Exception in thread "main" java.lang.RuntimeException: Error while applying 
> > rule FlinkLogicalJoinConverter, args 
> > [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
> >  $6),joinType=left)]
> >   at 
> > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
> >   at 
> > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
> >   at 
> > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
> >   at 
> > org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
> >   at 
> > org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
> >   at 
> > org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
> >   at 
> > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
> >   at 
> > org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
> >   at 
> > org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
> >   at 
> > org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
> > ...
> > Caused by: java.lang.RuntimeException: Error occurred while applying rule 
> > FlinkLogicalJoinConverter
> >   at 
> > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
> >   at 
> > org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
> >   at 
> > org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
> >   at 
> > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
> >   ... 11 more
> > Caused by: java.lang.NullPointerException
> >   at 
> > org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
> >   at 
> > org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
> >   at 
> > org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
> >   at 
> > org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
> >   at 
> > org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
> >   at 
> > 

okio and okhttp not shaded in the Flink Uber Jar on EMR

2019-02-26 Thread Austin Cawley-Edwards
Hi,

I recently experienced versioning clashes with the okio and okhttp when
trying to deploy a Flink 1.6.0 app to AWS EMR on Hadoop 2.8.4. After
investigating and talking to the okio team (see this issue)
, I found that both okio and
okhttp exist in the Flink uber jar with versions 1.4.0 and 2.4.0,
respectively, whereas I'm including versions 2.2.2 and 3.13.1 in my shaded
jar. The okio team suggested that Flink should shade the uber jar to fix
the issue, but I'm wondering if there is something I can do on my end to
have all versions exist simultaneously.

>From the issue, here are the okio contents of the uber jar:

*jar -tf flink-shaded-hadoop2-uber-1.6.0.jar | grep okio*

META-INF/maven/com.squareup.okio/
META-INF/maven/com.squareup.okio/okio/
META-INF/maven/com.squareup.okio/okio/pom.properties
META-INF/maven/com.squareup.okio/okio/pom.xml
okio/
okio/AsyncTimeout$1.class
okio/AsyncTimeout$2.class
okio/AsyncTimeout$Watchdog.class
okio/AsyncTimeout.class
okio/Base64.class
okio/Buffer$1.class
okio/Buffer$2.class
okio/Buffer.class
okio/BufferedSink.class
okio/BufferedSource.class
okio/ByteString.class
okio/DeflaterSink.class
okio/ForwardingSink.class
okio/ForwardingSource.class
okio/ForwardingTimeout.class
okio/GzipSink.class
okio/GzipSource.class
okio/InflaterSource.class
okio/Okio$1.class
okio/Okio$2.class
okio/Okio$3.class
okio/Okio.class
okio/RealBufferedSink$1.class
okio/RealBufferedSink.class
okio/RealBufferedSource$1.class
okio/RealBufferedSource.class
okio/Segment.class
okio/SegmentPool.class
okio/SegmentedByteString.class
okio/Sink.class
okio/Source.class
okio/Timeout$1.class
okio/Timeout.class
okio/Util.class

Thank you,
Austin Cawley-Edwards


Re: 试用BlinkSQL发现一些问题

2019-02-26 Thread Jark Wu
Hi,

邮件列表不支持发图片,可以发图片的链接。

Thanks,
Jark

On Wed, 27 Feb 2019 at 11:17, bigdatayunzhongyan
 wrote:

>
> 大家好!
> 在试用BlinkSQL中发现一些问题,总结如下:
> Hive版本2.3.4
> 1、SQL解析有问题 无法识别formatted
>
> 2、不支持hive外部表
> 3、Hive1.x版本兼容问题
>
>


Re: ProgramInvocationException when I submit job by 'flink run' after running Flink stand-alone more than 1 month?

2019-02-26 Thread Zhenghua Gao
Seem like there is something wrong with RestServer and the RestClient
didn't connect to it.
U can check the standalonesession log for investigating causes.

btw: The cause of  "no cluster was found"  is ur pid information was
cleaned for some reason.
The pid information is stored in ur TMP directory, it should look like
/tmp/flink-user-taskexecutor.pid or /tmp/flink-user-standalonesession.pid

On Wed, Feb 27, 2019 at 10:27 AM Son Mai  wrote:

> Hi,
> I'm having a question regarding Flink.
> I'm running Flink in stand-alone mode on 1 host (JobManager, TaskManager
> on the same host). At first, I'm able to submit and cancel jobs normally,
> the jobs showed up in the web UI and ran.
> However, after ~1month, when I canceled the old job and submitting a new
> one, I faced *org.apache.flink.client.program.ProgramInvocationException:
> Could not retrieve the execution result.*
> At this moment, I was able to run *flink list* to list current jobs and *flink
> cancel* to cancel the job, but *flink run* failed. Exception was thrown
> and the job was now shown in the web UI.
> When I tried to stop the current stand-alone cluster using *stop-cluster*,
> it said 'no cluster was found'. Then I had to find the pid of flink
> processes and stop them manually. Then if I run *start-cluster* to create
> a new stand-alone cluster, I was able to submit jobs normally.
> The shortened stack-trace: (full stack-trace at google docs link
> 
> )
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. (JobID: 7ef1cbddb744cd5769297f4059f7c531)
> at org.apache.flink.client.program.rest.RestClusterClient.submitJob
> (RestClusterClient.java:261)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been exhausted.
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rest.ConnectionClosedException: Channel became
> inactive.
> Caused by: org.apache.flink.runtime.rest.ConnectionClosedException:
> Channel became inactive.
> ... 37 more
> The error is consistent. It always happens after I let Flink run for a
> while, usually more than 1 month). Why am I not able to submit job to flink
> after a while? What happened here?
> Regards,
>
> Son
>


试用BlinkSQL发现一些问题

2019-02-26 Thread bigdatayunzhongyan

大家好!
在试用BlinkSQL中发现一些问题,总结如下:
Hive版本2.3.4
1、SQL解析有问题 无法识别formatted


2、不支持hive外部表

3、Hive1.x版本兼容问题



Re: ProgramInvocationException when I submit job by 'flink run' after running Flink stand-alone more than 1 month?

2019-02-26 Thread Benchao Li
Hi Son,

According to your description, maybe it's caused by the '/tmp' file system
retain strategy which removes tmp files regularly.

Son Mai  于2019年2月27日周三 上午10:27写道:

> Hi,
> I'm having a question regarding Flink.
> I'm running Flink in stand-alone mode on 1 host (JobManager, TaskManager
> on the same host). At first, I'm able to submit and cancel jobs normally,
> the jobs showed up in the web UI and ran.
> However, after ~1month, when I canceled the old job and submitting a new
> one, I faced *org.apache.flink.client.program.ProgramInvocationException:
> Could not retrieve the execution result.*
> At this moment, I was able to run *flink list* to list current jobs and *flink
> cancel* to cancel the job, but *flink run* failed. Exception was thrown
> and the job was now shown in the web UI.
> When I tried to stop the current stand-alone cluster using *stop-cluster*,
> it said 'no cluster was found'. Then I had to find the pid of flink
> processes and stop them manually. Then if I run *start-cluster* to create
> a new stand-alone cluster, I was able to submit jobs normally.
> The shortened stack-trace: (full stack-trace at google docs link
> 
> )
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. (JobID: 7ef1cbddb744cd5769297f4059f7c531)
> at org.apache.flink.client.program.rest.RestClusterClient.submitJob
> (RestClusterClient.java:261)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been exhausted.
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.runtime.rest.ConnectionClosedException: Channel became
> inactive.
> Caused by: org.apache.flink.runtime.rest.ConnectionClosedException:
> Channel became inactive.
> ... 37 more
> The error is consistent. It always happens after I let Flink run for a
> while, usually more than 1 month). Why am I not able to submit job to flink
> after a while? What happened here?
> Regards,
>
> Son
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


ProgramInvocationException when I submit job by 'flink run' after running Flink stand-alone more than 1 month?

2019-02-26 Thread Son Mai
Hi,
I'm having a question regarding Flink.
I'm running Flink in stand-alone mode on 1 host (JobManager, TaskManager on
the same host). At first, I'm able to submit and cancel jobs normally, the
jobs showed up in the web UI and ran.
However, after ~1month, when I canceled the old job and submitting a new
one, I faced *org.apache.flink.client.program.ProgramInvocationException:
Could not retrieve the execution result.*
At this moment, I was able to run *flink list* to list current jobs and *flink
cancel* to cancel the job, but *flink run* failed. Exception was thrown and
the job was now shown in the web UI.
When I tried to stop the current stand-alone cluster using *stop-cluster*,
it said 'no cluster was found'. Then I had to find the pid of flink
processes and stop them manually. Then if I run *start-cluster* to create a
new stand-alone cluster, I was able to submit jobs normally.
The shortened stack-trace: (full stack-trace at google docs link

)
org.apache.flink.client.program.ProgramInvocationException: Could not
retrieve the execution result. (JobID: 7ef1cbddb744cd5769297f4059f7c531)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob
(RestClusterClient.java:261)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
to submit JobGraph.
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
Could not complete the operation. Number of retries has been exhausted.
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.runtime.rest.ConnectionClosedException: Channel became
inactive.
Caused by: org.apache.flink.runtime.rest.ConnectionClosedException: Channel
became inactive.
... 37 more
The error is consistent. It always happens after I let Flink run for a
while, usually more than 1 month). Why am I not able to submit job to flink
after a while? What happened here?
Regards,

Son


One source is much slower than the other side when join history data

2019-02-26 Thread 刘建刚
  When consuming history data in join operator with eventTime, reading
data from one source is much slower than the other. As a result, the join
operator will cache much data from the faster source in order to wait the
slower source.
  The question is that how can I make the difference of consumers'
speed small?


Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-26 Thread vino yang
Great job. Stephan!

Best,
Vino

Jamie Grier  于2019年2月27日周三 上午2:27写道:

> This is awesome, Stephan!  Thanks for doing this.
>
> -Jamie
>
>
> On Tue, Feb 26, 2019 at 9:29 AM Stephan Ewen  wrote:
>
>> Here is the pull request with a draft of the roadmap:
>> https://github.com/apache/flink-web/pull/178
>>
>> Best,
>> Stephan
>>
>> On Fri, Feb 22, 2019 at 5:18 AM Hequn Cheng  wrote:
>>
>>> Hi Stephan,
>>>
>>> Thanks for summarizing the great roadmap! It is very helpful for users
>>> and developers to track the direction of Flink.
>>> +1 for putting the roadmap on the website and update it per release.
>>>
>>> Besides, would be great if the roadmap can add the UpsertSource
>>> feature(maybe put it under 'Batch Streaming Unification').
>>> It has been discussed a long time ago[1,2] and is moving forward step by
>>> step.
>>> Currently, Flink can only emit upsert results. With the UpsertSource, we
>>> can make our system a more complete one.
>>>
>>> Best, Hequn
>>>
>>> [1]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-TABLE-How-to-handle-empty-delete-for-UpsertSource-td23856.html#a23874
>>> [2] https://issues.apache.org/jira/browse/FLINK-8545
>>> 
>>>
>>>
>>>
>>> On Fri, Feb 22, 2019 at 3:31 AM Rong Rong  wrote:
>>>
 Hi Stephan,

 Yes. I completely agree. Jincheng & Jark gave some very valuable
 feedbacks and suggestions and I think we can definitely move the
 conversation forward to reach a more concrete doc first before we put in to
 the roadmap. Thanks for reviewing it and driving the roadmap effort!

 --
 Rong

 On Thu, Feb 21, 2019 at 8:50 AM Stephan Ewen  wrote:

> Hi Rong Rong!
>
> I would add the security / kerberos threads to the roadmap. They seem
> to be advanced enough in the discussions so that there is clarity what 
> will
> come.
>
> For the window operator with slicing, I would personally like to see
> the discussion advance and have some more clarity and consensus on the
> feature before adding it to the roadmap. Not having that in the first
> version of the roadmap does not mean there will be no activity. And when
> the discussion advances well in the next weeks, we can update the roadmap
> soon.
>
> What do you think?
>
> Best,
> Stephan
>
>
> On Thu, Feb 14, 2019 at 5:46 PM Rong Rong  wrote:
>
>> Hi Stephan,
>>
>> Thanks for the clarification, yes I think these issues has already
>> been discussed in previous mailing list threads [1,2,3].
>>
>> I also agree that updating the "official" roadmap every release is a
>> very good idea to avoid frequent update.
>> One question I might've been a bit confusion is: are we suggesting to
>> keep one roadmap on the documentation site (e.g. [4]) per release, or
>> simply just one most up-to-date roadmap in the main website [5] ?
>> Just like the release notes in every release, the former will
>> probably provide a good tracker for users to look back at previous 
>> roadmaps
>> as well I am assuming.
>>
>> Thanks,
>> Rong
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html
>>
>> [4] https://ci.apache.org/projects/flink/flink-docs-release-1.7/
>> [5] https://flink.apache.org/
>>
>> On Thu, Feb 14, 2019 at 2:26 AM Stephan Ewen 
>> wrote:
>>
>>> I think the website is better as well.
>>>
>>> I agree with Fabian that the wiki is not so visible, and visibility
>>> is the main motivation.
>>> This type of roadmap overview would not be updated by everyone -
>>> letting committers update the roadmap means the listed threads are 
>>> actually
>>> happening at the moment.
>>>
>>>
>>> On Thu, Feb 14, 2019 at 11:14 AM Fabian Hueske 
>>> wrote:
>>>
 Hi,

 I like the idea of putting the roadmap on the website because it is
 much more visible (and IMO more credible, obligatory) there.
 However, I share the concerns about frequent updates.

 It think it would be great to update the "official" roadmap on the
 website once per release (-bugfix releases), i.e., every three month.
 We can use the wiki to collect and draft the roadmap for the next
 update.

 Best, Fabian


 Am Do., 14. Feb. 2019 um 11:03 Uhr schrieb Jeff Zhang <
 zjf...@gmail.com>:

> Hi Stephan,
>
> Thanks for this proposal. It is a good 

Re: Flink window triggering and timing on connected streams

2019-02-26 Thread Rong Rong
Hi Andrew,

To add to the answer Till and Hequn already provide. WindowOperator are
operating on a per-key-group based. so as long as you only have one open
session per partition key group, you should be able to manage the windowing
using the watermark strategy Hequn mentioned.
As Till mentioned, the watermarks are the minimum of the connected streams,
thus you should be able to just use "session window with long timeout" as
you described.

One thought is that have you looked at Flink CEP[1]? This use case seems to
fit pretty well if you can do the co-stream function as a first stage.

--
Rong

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html

On Tue, Feb 26, 2019 at 2:31 AM Till Rohrmann  wrote:

> Hi Andrew,
>
> if using connected streams (e.g. CoFlatMapFunction or CoMapFunction), then
> the watermarks will be synchronized across both inputs. Concretely, you
> will always emit the minimum of the watermarks arriving on input channel 1
> and 2. Take a look at AbstractStreamOperator.java:773-804.
>
> Cheers,
> Till
>
> On Tue, Feb 26, 2019 at 4:27 AM Andrew Roberts  wrote:
>
>> I’m not sure that approach will work for me, as I have many sessions
>> going at the same time which can overlap. Also, I need to be able to have
>> sessions time out if they never receive an end event. Do you know directly
>> if setting a timer triggers when any timestamp passes that time, or when
>> the watermark passes that time?
>>
>>
>> On Feb 25, 2019, at 9:08 PM, Hequn Cheng  wrote:
>>
>> Hi Andrew,
>>
>> >  I have an “end session” event that I want to cause the window to fire
>> and purge.
>> Do you want to fire the window only by the 'end session' event? I see one
>> option to solve the problem. You can use a tumbling window(say 5s) and set
>> your timestamp to t‘+5s each time receiving an 'end session' event in your
>> user-defined `AssignerWithPeriodicWatermarks`.
>>
>> > My understanding is that this is what the trailing watermark is for,
>> and that in connected streams, the lowest (earliest) watermark of the input
>> streams is what is seen as the watermark downstream.
>> Yes, and we can make use of this to make window fires only on 'end
>> session' event using the solution above.
>>
>> Best, Hequn
>>
>>
>> On Tue, Feb 26, 2019 at 5:54 AM Andrew Roberts  wrote:
>>
>>> Hello,
>>>
>>> I’m trying to implement session windows over a set of connected streams
>>> (event time), with some custom triggering behavior. Essentially, I allow
>>> very long session gaps, but I have an “end session” event that I want to
>>> cause the window to fire and purge. I’m assigning timestamps and watermarks
>>> using BoundedOutOfOrdernessTimestampExtractor, with a 1 minute delay for
>>> the watermark. I have things mostly wired up, but I have some confusion
>>> about how I can ensure that my streams stay “in sync” relative to time.
>>>
>>>  Let’s say I am connecting streams A and B. Stream A is where the “end
>>> session” event always comes from. If I have a session involving events from
>>> time t to t’ in stream A, and then at t’ I get an “end session”, I want to
>>> ensure that the window doesn’t fire until stream B has also processed
>>> events (added events to the window) up to time t’. My understanding is that
>>> this is what the trailing watermark is for, and that in connected streams,
>>> the lowest (earliest) watermark of the input streams is what is seen as the
>>> watermark downstream.
>>>
>>> Currently, I’m setting a timer for the current time + 1 when I see my
>>> “end event”, with the idea that that timer will fire when the WATERMARK
>>> passes that time, i.e., all streams have progressed at least as far as that
>>> end event. However, the implementation of EventTimeTrigger doesn’t really
>>> look like that’s what’s going on.
>>>
>>> Can anyone clear up how these concepts interact? Is there a good model
>>> for this “session end event” concept that I can take a look at?
>>>
>>> Thanks,
>>>
>>> Andrew
>>> --
>>> *Confidentiality Notice: The information contained in this e-mail and any
>>>
>>> attachments may be confidential. If you are not an intended recipient,
>>> you
>>>
>>> are hereby notified that any dissemination, distribution or copying of
>>> this
>>>
>>> e-mail is strictly prohibited. If you have received this e-mail in error,
>>>
>>> please notify the sender and permanently delete the e-mail and any
>>>
>>> attachments immediately. You should not retain, copy or use this e-mail
>>> or
>>>
>>> any attachment for any purpose, nor disclose all or any part of the
>>>
>>> contents to any other person. Thank you.*
>>>
>>
>>
>> *Confidentiality Notice: The information contained in this e-mail and any
>> attachments may be confidential. If you are not an intended recipient, you
>> are hereby notified that any dissemination, distribution or copying of
>> this
>> e-mail is strictly prohibited. If you have received this e-mail in error,
>> please notify the sender and permanently delete the e-mail and 

Re: long lived standalone job session cluster in kubernetes

2019-02-26 Thread Chunhui Shi
Hi Heath and Till, thanks for offering help on reviewing this feature. I
just reassigned the JIRAs to myself after offline discussion with Jin. Let
us work together to get kubernetes integrated natively with flink. Thanks.

On Fri, Feb 15, 2019 at 12:19 AM Till Rohrmann  wrote:

> Alright, I'll get back to you once the PRs are open. Thanks a lot for your
> help :-)
>
> Cheers,
> Till
>
> On Thu, Feb 14, 2019 at 5:45 PM Heath Albritton  wrote:
>
>> My team and I are keen to help out with testing and review as soon as
>> there is a pill request.
>>
>> -H
>>
>> On Feb 11, 2019, at 00:26, Till Rohrmann  wrote:
>>
>> Hi Heath,
>>
>> I just learned that people from Alibaba already made some good progress
>> with FLINK-9953. I'm currently talking to them in order to see how we can
>> merge this contribution into Flink as fast as possible. Since I'm quite
>> busy due to the upcoming release I hope that other community members will
>> help out with the reviewing once the PRs are opened.
>>
>> Cheers,
>> Till
>>
>> On Fri, Feb 8, 2019 at 8:50 PM Heath Albritton  wrote:
>>
>>> Has any progress been made on this?  There are a number of folks in
>>> the community looking to help out.
>>>
>>>
>>> -H
>>>
>>> On Wed, Dec 5, 2018 at 10:00 AM Till Rohrmann 
>>> wrote:
>>> >
>>> > Hi Derek,
>>> >
>>> > there is this issue [1] which tracks the active Kubernetes
>>> integration. Jin Sun already started implementing some parts of it. There
>>> should also be some PRs open for it. Please check them out.
>>> >
>>> > [1] https://issues.apache.org/jira/browse/FLINK-9953
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Wed, Dec 5, 2018 at 6:39 PM Derek VerLee 
>>> wrote:
>>> >>
>>> >> Sounds good.
>>> >>
>>> >> Is someone working on this automation today?
>>> >>
>>> >> If not, although my time is tight, I may be able to work on a PR for
>>> getting us started down the path Kubernetes native cluster mode.
>>> >>
>>> >>
>>> >> On 12/4/18 5:35 AM, Till Rohrmann wrote:
>>> >>
>>> >> Hi Derek,
>>> >>
>>> >> what I would recommend to use is to trigger the cancel with savepoint
>>> command [1]. This will create a savepoint and terminate the job execution.
>>> Next you simply need to respawn the job cluster which you provide with the
>>> savepoint to resume from.
>>> >>
>>> >> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint
>>> >>
>>> >> Cheers,
>>> >> Till
>>> >>
>>> >> On Tue, Dec 4, 2018 at 10:30 AM Andrey Zagrebin <
>>> and...@data-artisans.com> wrote:
>>> >>>
>>> >>> Hi Derek,
>>> >>>
>>> >>> I think your automation steps look good.
>>> >>> Recreating deployments should not take long
>>> >>> and as you mention, this way you can avoid unpredictable old/new
>>> version collisions.
>>> >>>
>>> >>> Best,
>>> >>> Andrey
>>> >>>
>>> >>> > On 4 Dec 2018, at 10:22, Dawid Wysakowicz 
>>> wrote:
>>> >>> >
>>> >>> > Hi Derek,
>>> >>> >
>>> >>> > I am not an expert in kubernetes, so I will cc Till, who should be
>>> able
>>> >>> > to help you more.
>>> >>> >
>>> >>> > As for the automation for similar process I would recommend having
>>> a
>>> >>> > look at dA platform[1] which is built on top of kubernetes.
>>> >>> >
>>> >>> > Best,
>>> >>> >
>>> >>> > Dawid
>>> >>> >
>>> >>> > [1] https://data-artisans.com/platform-overview
>>> >>> >
>>> >>> > On 30/11/2018 02:10, Derek VerLee wrote:
>>> >>> >>
>>> >>> >> I'm looking at the job cluster mode, it looks great and I and
>>> >>> >> considering migrating our jobs off our "legacy" session cluster
>>> and
>>> >>> >> into Kubernetes.
>>> >>> >>
>>> >>> >> I do need to ask some questions because I haven't found a lot of
>>> >>> >> details in the documentation about how it works yet, and I gave up
>>> >>> >> following the the DI around in the code after a while.
>>> >>> >>
>>> >>> >> Let's say I have a deployment for the job "leader" in HA with ZK,
>>> and
>>> >>> >> another deployment for the taskmanagers.
>>> >>> >>
>>> >>> >> I want to upgrade the code or configuration and start from a
>>> >>> >> savepoint, in an automated way.
>>> >>> >>
>>> >>> >> Best I can figure, I can not just update the deployment resources
>>> in
>>> >>> >> kubernetes and allow the containers to restart in an arbitrary
>>> order.
>>> >>> >>
>>> >>> >> Instead, I expect sequencing is important, something along the
>>> lines
>>> >>> >> of this:
>>> >>> >>
>>> >>> >> 1. issue savepoint command on leader
>>> >>> >> 2. wait for savepoint
>>> >>> >> 3. destroy all leader and taskmanager containers
>>> >>> >> 4. deploy new leader, with savepoint url
>>> >>> >> 5. deploy new taskmanagers
>>> >>> >>
>>> >>> >>
>>> >>> >> For example, I imagine old taskmanagers (with an old version of my
>>> >>> >> job) attaching to the new leader and causing a problem.
>>> >>> >>
>>> >>> >> Does that sound right, or am I overthinking it?
>>> >>> >>
>>> >>> >> If not, has anyone tried implementing any automation for this yet?
>>> >>> >>
>>> >>> >
>>> >>>
>>>
>>


Re: StreamingFileSink on EMR

2019-02-26 Thread kb
Thanks! This fixed it.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: StreamingFileSink on EMR

2019-02-26 Thread Bruno Aranda
Hi,

That Jar must exist for all the 1.7 versions, but I was replacing the libs
for the Flink provided by the AWS EMR (1.7.0) by the more recent ones. But
you could download the 1.7.0 distribution and copy the
flink-s3-fs-hadoop-1.7.0.jar from there into the /usr/lib/flink/lib folder.

But knowing there is a more recent 1.7 release out there, I prefer
replacing the one in the EMR by this one. To do so, we basically replace
the libs in the /usr/lib/flink/lib folder by the ones from the most recent
distribution.

Cheers,

Bruno

On Tue, 26 Feb 2019 at 21:37, kb  wrote:

> Hi,
>
> So 1.7.2 jar has the fix?
>
> Thanks
> Kevin
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: StreamingFileSink on EMR

2019-02-26 Thread kb
Hi,

So 1.7.2 jar has the fix?

Thanks
Kevin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reducing the number of unique metrics

2019-02-26 Thread shkob1
Hey All,

Just read the excellent monitoring blog post
https://flink.apache.org/news/2019/02/25/monitoring-best-practices.html

I'm looking on reducing the number of unique metrics, especially on items i
can compromise on consolidating such as using indices instead of ids.
Specifically looking at task managers - i dont mind just having index of the
task manager such that multiple runs of the flink application (and Yarn
containers on EMR) won't create more unique metric paths but rather reuse
existing paths (tm.1, tm.2 .. ). Have anyone managed to do it or had to meet
the same requirement?

Thanks!
Shahar



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: left join failing with FlinkLogicalJoinConverter NPE

2019-02-26 Thread Karl Jin
I removed the multiset> field and the join worked fine.
The field was created from a Kafka source through a query that looks like
"select collect(data) as i_data from ... group by pk"

Do you think this is a bug or is this something I can get around using some
configuration?

On Tue, Feb 26, 2019 at 1:20 AM Xingcan Cui  wrote:

> Yes. Please check that. If it's the nested type's problem, this might be a
> bug.
>
> On Mon, Feb 25, 2019, 21:50 Karl Jin  wrote:
>
>> Do you think something funky might be happening with Map/Multiset types?
>> If so how do I deal with it (I think I can verify by removing those columns
>> and retry?)?
>>
>> On Mon, Feb 25, 2019 at 6:28 PM Karl Jin  wrote:
>>
>>> Thanks for checking in quickly,
>>>
>>> Below is what I got on printSchema on the two tables (left joining the
>>> second one to the first one on uc_pk = i_uc_pk). rowtime in both are
>>> extracted from the string field uc_update_ts
>>>
>>> root
>>>  |-- uc_pk: String
>>>  |-- uc_update_ts: String
>>>  |-- rowtime: TimeIndicatorTypeInfo(rowtime)
>>>  |-- uc_version: String
>>>  |-- uc_type: String
>>>  |-- data_parsed: Map
>>>
>>> root
>>>  |-- i_uc_pk: String
>>>  |-- i_uc_update_ts: TimeIndicatorTypeInfo(rowtime)
>>>  |-- image_count: Long
>>>  |-- i_data: Multiset>
>>>
>>> On Mon, Feb 25, 2019 at 4:54 PM Xingcan Cui  wrote:
>>>
 Hi Karl,

 It seems that some field types of your inputs were not properly
 extracted.
 Could you share the result of `printSchema()` for your input tables?

 Best,
 Xingcan

 > On Feb 25, 2019, at 4:35 PM, Karl Jin  wrote:
 >
 > Hello,
 >
 > First time posting, so please let me know if the formatting isn't
 correct, etc.
 >
 > I'm trying to left join two Kafka sources, running 1.7.2 locally, but
 getting the below exception. Looks like some sort of query optimization
 process but I'm not sure where to start investigating/debugging. I see
 things are marked as NONE in the object so that's a bit of a flag to me,
 although I don't know for sure. Any pointer would be much appreciated:
 >
 > Exception in thread "main" java.lang.RuntimeException: Error while
 applying rule FlinkLogicalJoinConverter, args
 [rel#94:LogicalJoin.NONE(left=rel#84:Subset#0.NONE,right=rel#93:Subset#5.NONE,condition==($0,
 $6),joinType=left)]
 >   at
 org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:236)
 >   at
 org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
 >   at
 org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
 >   at
 org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:373)
 >   at
 org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
 >   at
 org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
 >   at
 org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
 >   at
 org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:205)
 >   at
 org.apache.flink.table.api.scala.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:185)
 >   at
 org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:143)
 > ...
 > Caused by: java.lang.RuntimeException: Error occurred while applying
 rule FlinkLogicalJoinConverter
 >   at
 org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:149)
 >   at
 org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234)
 >   at
 org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:141)
 >   at
 org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
 >   ... 11 more
 > Caused by: java.lang.NullPointerException
 >   at
 org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:84)
 >   at
 org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
 >   at
 org.apache.flink.table.plan.nodes.FlinkRelNode$class.estimateDataTypeSize(FlinkRelNode.scala:104)
 >   at
 org.apache.flink.table.plan.nodes.logical.FlinkLogicalJoinBase.estimateDataTypeSize(FlinkLogicalJoinBase.scala:29)
 >   at
 org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:80)
 >   at
 org.apache.flink.table.plan.nodes.FlinkRelNode$$anonfun$estimateRowSize$2.apply(FlinkRelNode.scala:79)
 >   at
 scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
 >   at
 

Re: StreamingFileSink on EMR

2019-02-26 Thread Bruno Aranda
Hey,

Got it working, basically you need to add the flink-s3-fs-hadoop-1.7.2.jar
libraries from the /opt folder of the flink distribution into the
/usr/lib/flink/lib. That has done the trick for me.

Cheers,

Bruno

On Tue, 26 Feb 2019 at 16:28, kb  wrote:

> Hi Bruno,
>
> Thanks for verifying. We are aiming for the same.
>
> Best,
> Kevin
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-26 Thread Jamie Grier
This is awesome, Stephan!  Thanks for doing this.

-Jamie


On Tue, Feb 26, 2019 at 9:29 AM Stephan Ewen  wrote:

> Here is the pull request with a draft of the roadmap:
> https://github.com/apache/flink-web/pull/178
>
> Best,
> Stephan
>
> On Fri, Feb 22, 2019 at 5:18 AM Hequn Cheng  wrote:
>
>> Hi Stephan,
>>
>> Thanks for summarizing the great roadmap! It is very helpful for users
>> and developers to track the direction of Flink.
>> +1 for putting the roadmap on the website and update it per release.
>>
>> Besides, would be great if the roadmap can add the UpsertSource
>> feature(maybe put it under 'Batch Streaming Unification').
>> It has been discussed a long time ago[1,2] and is moving forward step by
>> step.
>> Currently, Flink can only emit upsert results. With the UpsertSource, we
>> can make our system a more complete one.
>>
>> Best, Hequn
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-TABLE-How-to-handle-empty-delete-for-UpsertSource-td23856.html#a23874
>> [2] https://issues.apache.org/jira/browse/FLINK-8545
>> 
>>
>>
>>
>> On Fri, Feb 22, 2019 at 3:31 AM Rong Rong  wrote:
>>
>>> Hi Stephan,
>>>
>>> Yes. I completely agree. Jincheng & Jark gave some very valuable
>>> feedbacks and suggestions and I think we can definitely move the
>>> conversation forward to reach a more concrete doc first before we put in to
>>> the roadmap. Thanks for reviewing it and driving the roadmap effort!
>>>
>>> --
>>> Rong
>>>
>>> On Thu, Feb 21, 2019 at 8:50 AM Stephan Ewen  wrote:
>>>
 Hi Rong Rong!

 I would add the security / kerberos threads to the roadmap. They seem
 to be advanced enough in the discussions so that there is clarity what will
 come.

 For the window operator with slicing, I would personally like to see
 the discussion advance and have some more clarity and consensus on the
 feature before adding it to the roadmap. Not having that in the first
 version of the roadmap does not mean there will be no activity. And when
 the discussion advances well in the next weeks, we can update the roadmap
 soon.

 What do you think?

 Best,
 Stephan


 On Thu, Feb 14, 2019 at 5:46 PM Rong Rong  wrote:

> Hi Stephan,
>
> Thanks for the clarification, yes I think these issues has already
> been discussed in previous mailing list threads [1,2,3].
>
> I also agree that updating the "official" roadmap every release is a
> very good idea to avoid frequent update.
> One question I might've been a bit confusion is: are we suggesting to
> keep one roadmap on the documentation site (e.g. [4]) per release, or
> simply just one most up-to-date roadmap in the main website [5] ?
> Just like the release notes in every release, the former will probably
> provide a good tracker for users to look back at previous roadmaps as well
> I am assuming.
>
> Thanks,
> Rong
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
> [3]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html
>
> [4] https://ci.apache.org/projects/flink/flink-docs-release-1.7/
> [5] https://flink.apache.org/
>
> On Thu, Feb 14, 2019 at 2:26 AM Stephan Ewen  wrote:
>
>> I think the website is better as well.
>>
>> I agree with Fabian that the wiki is not so visible, and visibility
>> is the main motivation.
>> This type of roadmap overview would not be updated by everyone -
>> letting committers update the roadmap means the listed threads are 
>> actually
>> happening at the moment.
>>
>>
>> On Thu, Feb 14, 2019 at 11:14 AM Fabian Hueske 
>> wrote:
>>
>>> Hi,
>>>
>>> I like the idea of putting the roadmap on the website because it is
>>> much more visible (and IMO more credible, obligatory) there.
>>> However, I share the concerns about frequent updates.
>>>
>>> It think it would be great to update the "official" roadmap on the
>>> website once per release (-bugfix releases), i.e., every three month.
>>> We can use the wiki to collect and draft the roadmap for the next
>>> update.
>>>
>>> Best, Fabian
>>>
>>>
>>> Am Do., 14. Feb. 2019 um 11:03 Uhr schrieb Jeff Zhang <
>>> zjf...@gmail.com>:
>>>
 Hi Stephan,

 Thanks for this proposal. It is a good idea to track the roadmap.
 One suggestion is that it might be better to put it into wiki page 
 first.
 Because it is easier to update the roadmap on wiki compared to on 
 flink web
 site. 

Re: Is there a Flink DataSet equivalent to Spark's RDD.persist?

2019-02-26 Thread Andrey Zagrebin
Hi Frank,

This feature is currently under discussion. You can follow it in this issue:
https://issues.apache.org/jira/browse/FLINK-11199

Best,
Andrey

On Thu, Feb 21, 2019 at 7:41 PM Frank Grimes 
wrote:

> Hi,
>
> I'm trying to port an existing Spark job to Flink and have gotten stuck on
> the same issue brought up here:
>
> https://stackoverflow.com/questions/46243181/cache-and-persist-datasets
>
> Is there some way to accomplish this same thing in Flink?
> i.e. avoid re-computing a particular DataSet when multiple different
> subsequent transformations are required on it.
>
> I've even tried explicitly writing out the DataSet to avoid the
> re-computation but still taking an I/O hit for the initial write to HDFS
> and subsequent re-reading of it in the following stages.
> While it does yield a performance improvement over no caching at all, it
> doesn't match the performance I get with RDD.persist in Spark.
>
> Thanks,
>
> Frank Grimes
>


Re: Submitting job to Flink on yarn timesout on flip-6 1.5.x

2019-02-26 Thread Richard Deurwaarder
Hello Gary,

Thank you for your response.

I'd like to use the new mode but it does not work for me. It seems I am
running into a firewall issue.

Because the rest.port is random when running on yarn[1]. The machine I use
to deploy the job can, in fact, start the Flink cluster, but it cannot
submit the job on the random chosen port because our firewall blocks it.

Do you know if this is still the case on 1.7 and if there is any way to
work around this?

Richard

[1]
https://stackoverflow.com/questions/54000276/flink-web-port-can-not-be-configured-correctly-in-yarn-mode

On Thu, Feb 21, 2019 at 3:41 PM Gary Yao  wrote:

> Hi,
>
> Beginning with Flink 1.7, you cannot use the legacy mode anymore [1][2]. I
> am
> currently working on removing references to the legacy mode in the
> documentation [3]. Is there any reason, you cannot use the "new mode"?
>
> Best,
> Gary
>
> [1] https://flink.apache.org/news/2018/11/30/release-1.7.0.html
> [2] https://issues.apache.org/jira/browse/FLINK-10392
> [3] https://issues.apache.org/jira/browse/FLINK-11713
>
> On Mon, Feb 18, 2019 at 12:00 PM Richard Deurwaarder 
> wrote:
>
>> Hello,
>>
>> I am trying to upgrade our job from flink 1.4.2 to 1.7.1 but I keep
>> running into timeouts after submitting the job.
>>
>> The flink job runs on our hadoop cluster and starts using Yarn.
>>
>> Relevant config options seem to be:
>>
>> jobmanager.rpc.port: 55501
>>
>> recovery.jobmanager.port: 55502
>>
>> yarn.application-master.port: 55503
>>
>> blob.server.port: 55504
>>
>>
>> I've seen the following behavior:
>>   - Using the same flink-conf.yaml as we used in 1.4.2: 1.5.6 / 1.6.3 /
>> 1.7.1 all versions timeout while 1.4.2 works.
>>   - Using 1.5.6 with "mode: legacy" (to switch off flip-6) works
>>   - Using 1.7.1 with "mode: legacy" gives timeout (I assume this option
>> was removed but the documentation is outdated?
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#legacy
>> )
>>
>> When the timeout happens I get the following stacktrace:
>>
>> INFO class java.time.Instant does not contain a getter for field seconds
>> 2019-02-18T10:16:56.815+01:00
>> INFO class com.bol.fin_hdp.cm1.domain.Cm1Transportable does not contain
>> a getter for field globalId 2019-02-18T10:16:56.815+01:00
>> INFO Submitting job 5af931bcef395a78b5af2b97e92dcffe (detached: false).
>> 2019-02-18T10:16:57.182+01:00
>> INFO 
>> 2019-02-18T10:29:27.527+01:00
>> INFO The program finished with the following exception:
>> 2019-02-18T10:29:27.564+01:00
>> INFO org.apache.flink.client.program.ProgramInvocationException: The
>> main method caused an error. 2019-02-18T10:29:27.601+01:00
>> INFO at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
>> 2019-02-18T10:29:27.638+01:00
>> INFO at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
>> 2019-02-18T10:29:27.675+01:00
>> INFO at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
>> 2019-02-18T10:29:27.711+01:00
>> INFO at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:798)
>> 2019-02-18T10:29:27.747+01:00
>> INFO at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:289)
>> 2019-02-18T10:29:27.784+01:00
>> INFO at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
>> 2019-02-18T10:29:27.820+01:00
>> INFO at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1035)
>> 2019-02-18T10:29:27.857+01:00
>> INFO at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:)
>> 2019-02-18T10:29:27.893+01:00
>> INFO at java.security.AccessController.doPrivileged(Native Method)
>> 2019-02-18T10:29:27.929+01:00
>> INFO at javax.security.auth.Subject.doAs(Subject.java:422)
>> 2019-02-18T10:29:27.968+01:00
>> INFO at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>> 2019-02-18T10:29:28.004+01:00
>> INFO at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> 2019-02-18T10:29:28.040+01:00
>> INFO at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:)
>> 2019-02-18T10:29:28.075+01:00
>> INFO Caused by: java.lang.RuntimeException:
>> org.apache.flink.client.program.ProgramInvocationException: Could not
>> retrieve the execution result. 2019-02-18T10:29:28.110+01:00
>> INFO at
>> com.bol.fin_hdp.job.starter.IntervalJobStarter.startJob(IntervalJobStarter.java:43)
>> 2019-02-18T10:29:28.146+01:00
>> INFO at
>> com.bol.fin_hdp.job.starter.IntervalJobStarter.startJobWithConfig(IntervalJobStarter.java:32)
>> 2019-02-18T10:29:28.182+01:00
>> INFO at com.bol.fin_hdp.Main.main(Main.java:8)
>> 2019-02-18T10:29:28.217+01:00
>> INFO at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 2019-02-18T10:29:28.253+01:00
>> INFO at
>> 

Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-26 Thread Stephan Ewen
Here is the pull request with a draft of the roadmap:
https://github.com/apache/flink-web/pull/178

Best,
Stephan

On Fri, Feb 22, 2019 at 5:18 AM Hequn Cheng  wrote:

> Hi Stephan,
>
> Thanks for summarizing the great roadmap! It is very helpful for users and
> developers to track the direction of Flink.
> +1 for putting the roadmap on the website and update it per release.
>
> Besides, would be great if the roadmap can add the UpsertSource
> feature(maybe put it under 'Batch Streaming Unification').
> It has been discussed a long time ago[1,2] and is moving forward step by
> step.
> Currently, Flink can only emit upsert results. With the UpsertSource, we
> can make our system a more complete one.
>
> Best, Hequn
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-TABLE-How-to-handle-empty-delete-for-UpsertSource-td23856.html#a23874
> [2] https://issues.apache.org/jira/browse/FLINK-8545
> 
>
>
>
> On Fri, Feb 22, 2019 at 3:31 AM Rong Rong  wrote:
>
>> Hi Stephan,
>>
>> Yes. I completely agree. Jincheng & Jark gave some very valuable
>> feedbacks and suggestions and I think we can definitely move the
>> conversation forward to reach a more concrete doc first before we put in to
>> the roadmap. Thanks for reviewing it and driving the roadmap effort!
>>
>> --
>> Rong
>>
>> On Thu, Feb 21, 2019 at 8:50 AM Stephan Ewen  wrote:
>>
>>> Hi Rong Rong!
>>>
>>> I would add the security / kerberos threads to the roadmap. They seem to
>>> be advanced enough in the discussions so that there is clarity what will
>>> come.
>>>
>>> For the window operator with slicing, I would personally like to see the
>>> discussion advance and have some more clarity and consensus on the feature
>>> before adding it to the roadmap. Not having that in the first version of
>>> the roadmap does not mean there will be no activity. And when the
>>> discussion advances well in the next weeks, we can update the roadmap soon.
>>>
>>> What do you think?
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Thu, Feb 14, 2019 at 5:46 PM Rong Rong  wrote:
>>>
 Hi Stephan,

 Thanks for the clarification, yes I think these issues has already been
 discussed in previous mailing list threads [1,2,3].

 I also agree that updating the "official" roadmap every release is a
 very good idea to avoid frequent update.
 One question I might've been a bit confusion is: are we suggesting to
 keep one roadmap on the documentation site (e.g. [4]) per release, or
 simply just one most up-to-date roadmap in the main website [5] ?
 Just like the release notes in every release, the former will probably
 provide a good tracker for users to look back at previous roadmaps as well
 I am assuming.

 Thanks,
 Rong

 [1]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
 [2]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
 [3]
 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html

 [4] https://ci.apache.org/projects/flink/flink-docs-release-1.7/
 [5] https://flink.apache.org/

 On Thu, Feb 14, 2019 at 2:26 AM Stephan Ewen  wrote:

> I think the website is better as well.
>
> I agree with Fabian that the wiki is not so visible, and visibility is
> the main motivation.
> This type of roadmap overview would not be updated by everyone -
> letting committers update the roadmap means the listed threads are 
> actually
> happening at the moment.
>
>
> On Thu, Feb 14, 2019 at 11:14 AM Fabian Hueske 
> wrote:
>
>> Hi,
>>
>> I like the idea of putting the roadmap on the website because it is
>> much more visible (and IMO more credible, obligatory) there.
>> However, I share the concerns about frequent updates.
>>
>> It think it would be great to update the "official" roadmap on the
>> website once per release (-bugfix releases), i.e., every three month.
>> We can use the wiki to collect and draft the roadmap for the next
>> update.
>>
>> Best, Fabian
>>
>>
>> Am Do., 14. Feb. 2019 um 11:03 Uhr schrieb Jeff Zhang <
>> zjf...@gmail.com>:
>>
>>> Hi Stephan,
>>>
>>> Thanks for this proposal. It is a good idea to track the roadmap.
>>> One suggestion is that it might be better to put it into wiki page 
>>> first.
>>> Because it is easier to update the roadmap on wiki compared to on flink 
>>> web
>>> site. And I guess we may need to update the roadmap very often at the
>>> beginning as there's so many discussions and proposals in community
>>> recently. We can move it into flink web site later when we feel it 
>>> could be
>>> nailed down.

Re: StreamingFileSink on EMR

2019-02-26 Thread kb
Hi Bruno,

Thanks for verifying. We are aiming for the same.

Best,
Kevin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Sftp source in flink

2019-02-26 Thread Siew Wai Yow


Hi guys,

Anyone can share experience on sftp source? Should i use hadoop sftpfilesystem 
or i can simply use any sftp java library in a user-defined source?

Thanks.

Regards,
Yow


Re: StreamingFileSink on EMR

2019-02-26 Thread Bruno Aranda
Hi,

I am having the same issue, but it is related to what Kostas is pointing
out. I was trying to stream to the "s3" scheme and not "hdfs", and then
getting that exception.

I have realised that somehow I need to reach the S3RecoverableWriter, and
found out it is in a difference library "flink-s3-fs-hadoop". Still trying
to figure out how to make it work, though. I am aiming for code such as:

  val sink = StreamingFileSink
  .forBulkFormat(new Path("s3://"), ...)
  .build()

Cheers,

Bruno

On Tue, 26 Feb 2019 at 14:59, Kostas Kloudas  wrote:

> Hi Kevin,
>
> I cannot find anything obviously wrong from what you describe.
> Just to eliminate the obvious, you are specifying "hdfs" as the scheme for
> your file path, right?
>
> Cheers,
> Kostas
>
> On Tue, Feb 26, 2019 at 3:35 PM Till Rohrmann 
> wrote:
>
>> Hmm good question, I've pulled in Kostas who worked on the
>> StreamingFileSink. He might be able to tell you more in case that there is
>> some special behaviour wrt the Hadoop file systems.
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 26, 2019 at 3:29 PM kb  wrote:
>>
>>> Hi Till,
>>>
>>> The only potential issue in the path I see is
>>> `/usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.29.0.jar`. I double
>>> checked my pom, the project is Hadoop-free. The JM log also shows `INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Hadoop
>>> version: 2.8.5-amzn-1`.
>>>
>>> Best,
>>> Kevin
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: StreamingFileSink on EMR

2019-02-26 Thread Kostas Kloudas
Hi Kevin,

I cannot find anything obviously wrong from what you describe.
Just to eliminate the obvious, you are specifying "hdfs" as the scheme for
your file path, right?

Cheers,
Kostas

On Tue, Feb 26, 2019 at 3:35 PM Till Rohrmann  wrote:

> Hmm good question, I've pulled in Kostas who worked on the
> StreamingFileSink. He might be able to tell you more in case that there is
> some special behaviour wrt the Hadoop file systems.
>
> Cheers,
> Till
>
> On Tue, Feb 26, 2019 at 3:29 PM kb  wrote:
>
>> Hi Till,
>>
>> The only potential issue in the path I see is
>> `/usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.29.0.jar`. I double
>> checked my pom, the project is Hadoop-free. The JM log also shows `INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Hadoop
>> version: 2.8.5-amzn-1`.
>>
>> Best,
>> Kevin
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Collapsing watermarks after keyby

2019-02-26 Thread Padarn Wilson
Okay. I think I still must misunderstand something here. I will work on
building a unit test around this, hopefully this clears up my confusion.

Thank you,
Padarn

On Tue, Feb 26, 2019 at 10:28 PM Till Rohrmann  wrote:

> Operator's with multiple inputs emit the minimum of the input's watermarks
> downstream. In case of a keyBy this means that the watermark is sent to all
> downstream consumers.
>
> Cheers,
> Till
>
> On Tue, Feb 26, 2019 at 1:47 PM Padarn Wilson  wrote:
>
>> Just to add: by printing intermediate results I see that I definitely
>> have more than five minutes of data, and by windowing without the session
>> windows I see that event time watermarks do seem to be generated as
>> expected.
>>
>> Thanks for your help and time.
>>
>> Padarn
>>
>> On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson  wrote:
>>
>>> Hi Till,
>>>
>>> I will work on an example, but I’m a little confused by how keyBy and
>>> watermarks work in this case. This documentation says (
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
>>> ):
>>>
>>>
>>> Some operators consume multiple input streams; a union, for example, or
>>> operators following a *keyBy(…)*or *partition(…)* function. Such an
>>> operator’s current event time is the minimum of its input streams’ event
>>> times. As its input streams update their event times, so does the operator.
>>>
>>>
>>> This implies to me that the keyBy splits the watermark?
>>>
>>> On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann 
>>> wrote:
>>>
 Hi Padarn,

 Flink does not generate watermarks per keys. Atm watermarks are always
 global. Therefore, I would suspect that it is rather a problem with
 generating watermarks at all. Could it be that your input data does not
 span a period longer than 5 minutes and also does not terminate? Another
 problem could be the CountTrigger which should not react to the window's
 end time. The method onEventTime simply returns TriggerResult.CONTINUE and
 I think this will cause the window to not fire. Maybe a working example
 program with example input could be helpful for further debugging.

 Cheers,
 Till

 On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson  wrote:

> Hi Flink Mailing List,
>
> Long story short - I want to somehow collapse watermarks at an
> operator across keys, so that keys with dragging watermarks do not drag
> behind. Details below:
>
> ---
>
> I have an application in which I want to perform the follow sequence
> of steps: Assume my data is made up of data that has: (time, user,
> location, action)
>
> -> Read source
> -> KeyBy (UserId, Location)
> -> EventTimeSessionWindow (5 min gap) - results in (User Location
> Session)
> -> TriggerOnFirst event
> -> KeyBy (Location)
> -> SlidingEventTimeWindow(5min length, 5 second gap)
> -> Count
>
> The end intention is to count the number of unique users in a given
> location - the EventTimeSessionWindow is used to make sure users are only
> counted once.
>
> So I created a custom Trigger, which is the same as CountTrigger, but
> has the following `TriggerResult" funtion:
>
> @Override
> public TriggerResult onElement(Object element, long timestamp, W window, 
> TriggerContext ctx) throws Exception {
>   ReducingState count = ctx.getPartitionedState(stateDesc);
>   count.add(1L);
>   if (count.get() == maxCount) {
> return TriggerResult.FIRE_AND_PURGE;
>   } else if (count.get() > maxCount) {
> return TriggerResult.PURGE;
>   }
>   return TriggerResult.CONTINUE;
>
> }
>
> But my final SlidingEventTimeWindow does not fire properly. This is
> because (I assume) there are some users with sessions windows that are not
> closed, and so the watermark for those keys is running behind and so the
> SlidingEventTimeWindow watermark is held back too.
>
> What I feel like I want to achieve is essentially setting the
> watermark of the SlidingEventTimeWindow operator to be the maximum (with
> lateness) of the input keys, rather than the minimum, but I cannot tell if
> this is possible, and if not, what another approach could be.
>
> Thanks,
> Padarn
>



Re: StreamingFileSink on EMR

2019-02-26 Thread Till Rohrmann
Hmm good question, I've pulled in Kostas who worked on the
StreamingFileSink. He might be able to tell you more in case that there is
some special behaviour wrt the Hadoop file systems.

Cheers,
Till

On Tue, Feb 26, 2019 at 3:29 PM kb  wrote:

> Hi Till,
>
> The only potential issue in the path I see is
> `/usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.29.0.jar`. I double
> checked my pom, the project is Hadoop-free. The JM log also shows `INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Hadoop
> version: 2.8.5-amzn-1`.
>
> Best,
> Kevin
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Share broadcast state between multiple operators

2019-02-26 Thread Till Rohrmann
On Tue, Feb 26, 2019 at 3:10 PM Richard Deurwaarder  wrote:

> Hello Till,
>
> So if I understand correctly, when messages get broadcast to multiple
> operators, each operator will execute the processBroadcast() function and
> store the state under a sort of operator scope? Even if they use the same
> MapStateDescriptor?
>
Yes.


> And if it replicates the state between operators is what makes the
> broadcast state different from an Operator state with Union redistribution?
>
The union redistribution is relevant in case of a restart where the every
operator receives the state from all other operators. The individual
operator states can be different. In case of broadcast state every
operator's state will be the same. So there is no union redistribution
needed.

Cheers,
Till


>
> Thanks for any clarification, very interesting to learn about :)
>
> Richard
>
> On Tue, Feb 26, 2019 at 11:57 AM Till Rohrmann 
> wrote:
>
>> Hi Richard,
>>
>> Flink does not support to share state between multiple operators.
>> Technically also the broadcast state is not shared but replicated between
>> subtasks belonging to the same operator. So what you can do is to send the
>> broadcast input to different operators, but they will all keep their own
>> copy of the state.
>>
>> Cheers,
>> Till
>>
>> On Mon, Feb 25, 2019 at 11:45 AM Richard Deurwaarder 
>> wrote:
>>
>>> Hi All,
>>>
>>> Due to the way our code is structured, we would like to use the
>>> broadcast state at multiple points of our pipeline. So not only share it
>>> between multiple instances of the same operator but also between multiple
>>> operators. See the image below for a simplified example.
>>>
>>> Flink does not seem to have any problems with this at runtime but I
>>> wonder:
>>>
>>>- Is this a good pattern and was it designed with something like
>>>this in mind?
>>>- If we use the same MapStateDescriptor in both operators, does the
>>>state only get stored once? And does it also only get written once?
>>>
>>>
>>> [image: broadcast-state.png]
>>>
>>> Thanks!
>>>
>>


Re: StreamingFileSink on EMR

2019-02-26 Thread kb
Hi Till,

The only potential issue in the path I see is
`/usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.29.0.jar`. I double
checked my pom, the project is Hadoop-free. The JM log also shows `INFO 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  Hadoop
version: 2.8.5-amzn-1`.

Best,
Kevin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Collapsing watermarks after keyby

2019-02-26 Thread Till Rohrmann
Operator's with multiple inputs emit the minimum of the input's watermarks
downstream. In case of a keyBy this means that the watermark is sent to all
downstream consumers.

Cheers,
Till

On Tue, Feb 26, 2019 at 1:47 PM Padarn Wilson  wrote:

> Just to add: by printing intermediate results I see that I definitely have
> more than five minutes of data, and by windowing without the session
> windows I see that event time watermarks do seem to be generated as
> expected.
>
> Thanks for your help and time.
>
> Padarn
>
> On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson  wrote:
>
>> Hi Till,
>>
>> I will work on an example, but I’m a little confused by how keyBy and
>> watermarks work in this case. This documentation says (
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
>> ):
>>
>>
>> Some operators consume multiple input streams; a union, for example, or
>> operators following a *keyBy(…)*or *partition(…)* function. Such an
>> operator’s current event time is the minimum of its input streams’ event
>> times. As its input streams update their event times, so does the operator.
>>
>>
>> This implies to me that the keyBy splits the watermark?
>>
>> On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann 
>> wrote:
>>
>>> Hi Padarn,
>>>
>>> Flink does not generate watermarks per keys. Atm watermarks are always
>>> global. Therefore, I would suspect that it is rather a problem with
>>> generating watermarks at all. Could it be that your input data does not
>>> span a period longer than 5 minutes and also does not terminate? Another
>>> problem could be the CountTrigger which should not react to the window's
>>> end time. The method onEventTime simply returns TriggerResult.CONTINUE and
>>> I think this will cause the window to not fire. Maybe a working example
>>> program with example input could be helpful for further debugging.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson  wrote:
>>>
 Hi Flink Mailing List,

 Long story short - I want to somehow collapse watermarks at an operator
 across keys, so that keys with dragging watermarks do not drag behind.
 Details below:

 ---

 I have an application in which I want to perform the follow sequence of
 steps: Assume my data is made up of data that has: (time, user, location,
 action)

 -> Read source
 -> KeyBy (UserId, Location)
 -> EventTimeSessionWindow (5 min gap) - results in (User Location
 Session)
 -> TriggerOnFirst event
 -> KeyBy (Location)
 -> SlidingEventTimeWindow(5min length, 5 second gap)
 -> Count

 The end intention is to count the number of unique users in a given
 location - the EventTimeSessionWindow is used to make sure users are only
 counted once.

 So I created a custom Trigger, which is the same as CountTrigger, but
 has the following `TriggerResult" funtion:

 @Override
 public TriggerResult onElement(Object element, long timestamp, W window, 
 TriggerContext ctx) throws Exception {
   ReducingState count = ctx.getPartitionedState(stateDesc);
   count.add(1L);
   if (count.get() == maxCount) {
 return TriggerResult.FIRE_AND_PURGE;
   } else if (count.get() > maxCount) {
 return TriggerResult.PURGE;
   }
   return TriggerResult.CONTINUE;

 }

 But my final SlidingEventTimeWindow does not fire properly. This is
 because (I assume) there are some users with sessions windows that are not
 closed, and so the watermark for those keys is running behind and so the
 SlidingEventTimeWindow watermark is held back too.

 What I feel like I want to achieve is essentially setting the watermark
 of the SlidingEventTimeWindow operator to be the maximum (with lateness) of
 the input keys, rather than the minimum, but I cannot tell if this is
 possible, and if not, what another approach could be.

 Thanks,
 Padarn

>>>


Re: Share broadcast state between multiple operators

2019-02-26 Thread Richard Deurwaarder
Hello Till,

So if I understand correctly, when messages get broadcast to multiple
operators, each operator will execute the processBroadcast() function and
store the state under a sort of operator scope? Even if they use the same
MapStateDescriptor?

And if it replicates the state between operators is what makes the
broadcast state different from an Operator state with Union redistribution?

Thanks for any clarification, very interesting to learn about :)

Richard

On Tue, Feb 26, 2019 at 11:57 AM Till Rohrmann  wrote:

> Hi Richard,
>
> Flink does not support to share state between multiple operators.
> Technically also the broadcast state is not shared but replicated between
> subtasks belonging to the same operator. So what you can do is to send the
> broadcast input to different operators, but they will all keep their own
> copy of the state.
>
> Cheers,
> Till
>
> On Mon, Feb 25, 2019 at 11:45 AM Richard Deurwaarder 
> wrote:
>
>> Hi All,
>>
>> Due to the way our code is structured, we would like to use the broadcast
>> state at multiple points of our pipeline. So not only share it between
>> multiple instances of the same operator but also between multiple
>> operators. See the image below for a simplified example.
>>
>> Flink does not seem to have any problems with this at runtime but I
>> wonder:
>>
>>- Is this a good pattern and was it designed with something like this
>>in mind?
>>- If we use the same MapStateDescriptor in both operators, does the
>>state only get stored once? And does it also only get written once?
>>
>>
>> [image: broadcast-state.png]
>>
>> Thanks!
>>
>


Re: Collapsing watermarks after keyby

2019-02-26 Thread Padarn Wilson
Just to add: by printing intermediate results I see that I definitely have
more than five minutes of data, and by windowing without the session
windows I see that event time watermarks do seem to be generated as
expected.

Thanks for your help and time.

Padarn

On Tue, 26 Feb 2019 at 8:43 PM, Padarn Wilson  wrote:

> Hi Till,
>
> I will work on an example, but I’m a little confused by how keyBy and
> watermarks work in this case. This documentation says (
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
> ):
>
>
> Some operators consume multiple input streams; a union, for example, or
> operators following a *keyBy(…)*or *partition(…)* function. Such an
> operator’s current event time is the minimum of its input streams’ event
> times. As its input streams update their event times, so does the operator.
>
>
> This implies to me that the keyBy splits the watermark?
>
> On Tue, 26 Feb 2019 at 6:40 PM, Till Rohrmann 
> wrote:
>
>> Hi Padarn,
>>
>> Flink does not generate watermarks per keys. Atm watermarks are always
>> global. Therefore, I would suspect that it is rather a problem with
>> generating watermarks at all. Could it be that your input data does not
>> span a period longer than 5 minutes and also does not terminate? Another
>> problem could be the CountTrigger which should not react to the window's
>> end time. The method onEventTime simply returns TriggerResult.CONTINUE and
>> I think this will cause the window to not fire. Maybe a working example
>> program with example input could be helpful for further debugging.
>>
>> Cheers,
>> Till
>>
>> On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson  wrote:
>>
>>> Hi Flink Mailing List,
>>>
>>> Long story short - I want to somehow collapse watermarks at an operator
>>> across keys, so that keys with dragging watermarks do not drag behind.
>>> Details below:
>>>
>>> ---
>>>
>>> I have an application in which I want to perform the follow sequence of
>>> steps: Assume my data is made up of data that has: (time, user, location,
>>> action)
>>>
>>> -> Read source
>>> -> KeyBy (UserId, Location)
>>> -> EventTimeSessionWindow (5 min gap) - results in (User Location
>>> Session)
>>> -> TriggerOnFirst event
>>> -> KeyBy (Location)
>>> -> SlidingEventTimeWindow(5min length, 5 second gap)
>>> -> Count
>>>
>>> The end intention is to count the number of unique users in a given
>>> location - the EventTimeSessionWindow is used to make sure users are only
>>> counted once.
>>>
>>> So I created a custom Trigger, which is the same as CountTrigger, but
>>> has the following `TriggerResult" funtion:
>>>
>>> @Override
>>> public TriggerResult onElement(Object element, long timestamp, W window, 
>>> TriggerContext ctx) throws Exception {
>>>   ReducingState count = ctx.getPartitionedState(stateDesc);
>>>   count.add(1L);
>>>   if (count.get() == maxCount) {
>>> return TriggerResult.FIRE_AND_PURGE;
>>>   } else if (count.get() > maxCount) {
>>> return TriggerResult.PURGE;
>>>   }
>>>   return TriggerResult.CONTINUE;
>>>
>>> }
>>>
>>> But my final SlidingEventTimeWindow does not fire properly. This is
>>> because (I assume) there are some users with sessions windows that are not
>>> closed, and so the watermark for those keys is running behind and so the
>>> SlidingEventTimeWindow watermark is held back too.
>>>
>>> What I feel like I want to achieve is essentially setting the watermark
>>> of the SlidingEventTimeWindow operator to be the maximum (with lateness) of
>>> the input keys, rather than the minimum, but I cannot tell if this is
>>> possible, and if not, what another approach could be.
>>>
>>> Thanks,
>>> Padarn
>>>
>>


Re: Share broadcast state between multiple operators

2019-02-26 Thread Till Rohrmann
Hi Richard,

Flink does not support to share state between multiple operators.
Technically also the broadcast state is not shared but replicated between
subtasks belonging to the same operator. So what you can do is to send the
broadcast input to different operators, but they will all keep their own
copy of the state.

Cheers,
Till

On Mon, Feb 25, 2019 at 11:45 AM Richard Deurwaarder 
wrote:

> Hi All,
>
> Due to the way our code is structured, we would like to use the broadcast
> state at multiple points of our pipeline. So not only share it between
> multiple instances of the same operator but also between multiple
> operators. See the image below for a simplified example.
>
> Flink does not seem to have any problems with this at runtime but I wonder:
>
>- Is this a good pattern and was it designed with something like this
>in mind?
>- If we use the same MapStateDescriptor in both operators, does the
>state only get stored once? And does it also only get written once?
>
>
> [image: broadcast-state.png]
>
> Thanks!
>


Re: Flink 1.6.4 signing key file in docker-flink repo?

2019-02-26 Thread Till Rohrmann
Hi William,

where do you get the /KEYS file from? Have you imported the latest KEYS
from here [1]?

[1] https://dist.apache.org/repos/dist/release/flink/KEYS

Cheers,
Till

On Mon, Feb 25, 2019 at 5:16 PM William Saar  wrote:

> Trying to build a new Docker image by replacing 1.6.3 with 1.6.4 in the
> Dockerfile found here ( https://github.com/docker-flink/docker-flink ),
> but seems to require a new signing key, Is it available somewhere?
>
> Getting
> + wget -nv -O flink.tgz.asc
> https://www.apache.org/dist/flink/flink-1.6.4/flink-1.6.4-bin-scala_2.11.tgz.asc
> 2019-02-25 15:58:20 URL:
> https://www.apache.org/dist/flink/flink-1.6.4/flink-1.6.4-bin-scala_2.11.tgz.asc
> [833/833] -> "flink.tgz.asc" [1]
> + mktemp -d
> + export GNUPGHOME=/tmp/tmp.fuHAgJfHYL
> + gpg --batch --import /KEYS
> gpg: keybox '/tmp/tmp.fuHAgJfHYL/pubring.kbx' created
> key 88BD3F5704D9B832:
> 1 signature not checked due to a missing key
> gpg: /tmp/tmp.fuHAgJfHYL/trustdb.gpg: trustdb created
> gpg: key 88BD3F5704D9B832: public key "Alan Gates (No comment) <
> ga...@yahoo-inc.com>" imported
> gpg: key D5E0A69C0CBAAE9F: public key "Sean Owen (CODE SIGNING KEY) <
> sro...@apache.org>" imported
> key D056856A0410DA0C:
> 1 signature not checked due to a missing key
> gpg: key D056856A0410DA0C: public key "Ted Dunning (for signing Apache
> releases) " imported
> gpg: key EDF4C9583592721E: public key "Henry Saputra (CODE SIGNING KEY) <
> hsapu...@apache.org>" imported
> uid  Owen O'Malley 
> sig!31209E7F13D0C92B9 2010-09-17  [self-signature]
> uid  Owen O'Malley (Code signing) 
> sig!31209E7F13D0C92B9 2010-09-17  [self-signature]
> sig!31209E7F13D0C92B9 2010-02-23  [self-signature]
> sub  73F426934D654583
> sig! 1209E7F13D0C92B9 2010-02-23  [self-signature]
> key 1209E7F13D0C92B9:
> 8 duplicate signatures removed
> 47 signatures not checked due to missing keys
> gpg: key 1209E7F13D0C92B9: public key "Owen O'Malley (Code signing) <
> omal...@apache.org>" imported
> gpg: key 183F6944D9839159: public key "Robert Metzger (CODE SIGNING KEY) <
> rmetz...@apache.org>" imported
> gpg: key DE3E0F4C9D403309: public key "Ufuk Celebi (CODE SIGNING KEY) <
> u...@apache.org>" imported
> gpg: key 2C70877AD675A2E9: public key "Márton Balassi (CODE SIGNING KEY) <
> mbala...@apache.org>" imported
> gpg: key DE976D18C2909CBF: public key "Maximilian Michels "
> imported
> gpg: key 600AD0D034911D5A: public key "Fabian Hueske (CODE SIGNING KEY) <
> fhue...@apache.org>" imported
> gpg: key 6D072D73B065B356: public key "Tzu-Li Tai (CODE SIGNING KEY) <
> tzuli...@apache.org>" imported
> gpg: key A8F4FD97121D7293: public key "Aljoscha Krettek (CODE SIGNING KEY)
> " imported
> gpg: key C2EED7B111D464BA: public key "Chesnay Schepler (CODE SIGNING KEY)
> " imported
> gpg: key F320986D35C33D6A: public key "Tzu-Li Tai (CODE SIGNING KEY) <
> tzuli...@apache.org>" imported
> gpg: key 1F302569A96CFFD5: public key "Till Rohrmann (stsffap) <
> trohrm...@apache.org>" imported
> gpg: key 12DEE3E4D920A98C: public key "Thomas Weise "
> imported
> gpg: Total number processed: 16
> gpg:   imported: 16
> gpg: no ultimately trusted keys found
> + gpg --batch --verify flink.tgz.asc flink.tgz
> gpg: Signature made Fri Feb 15 13:13:29 2019 UTC
> gpg:using RSA key 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E
> gpg: Can't check signature: No public key
>
> Thanks,
> William
>


Re: [Blink]sql client kafka sink 失败

2019-02-26 Thread Zhenghua Gao
换个干净的环境(清除 standalone sql client 进程及日志, 然后reproduce你的问题),
然后把对应的 standalonesession, taskexecutor, 及 sql client日志传上来看看。


On Tue, Feb 26, 2019 at 10:43 AM 张洪涛  wrote:

>
>
> 如果把kafka connector shade jar放在blink lib 下面 然后启动是没有问题的  但是放在sql client
> --jar 参数就有问题
>
>
> 我又多测试了几遍 发现class not found的类 是随机的
>
>
> 有什么建议么?
>
>
> 2019-02-26 10:36:10,343 ERROR
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.utils.KafkaThread
> - Uncaught exception in kafka-producer-network-thread | producer-1:
> java.lang.NoClassDefFoundError:
> org/apache/flink/kafka011/shaded/org/apache/kafka/common/requests/ProduceResponse$PartitionResponse
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.requests.ProduceResponse.(ProduceResponse.java:107)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.requests.AbstractResponse.getResponse(AbstractResponse.java:55)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.createResponse(NetworkClient.java:569)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:663)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
> at
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.kafka011.shaded.org.apache.kafka.common.requests.ProduceResponse$PartitionResponse
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:120)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
>
>
>
> 在 2019-02-25 14:34:54,"Becket Qin"  写道:
> >@Kurt,
> >
> >这个是符合预期的。为了防止和用户code中可能的Kafka依赖发生冲突。
> >
> >On Mon, Feb 25, 2019 at 10:28 AM Kurt Young  wrote:
> >
> >> kafka的包看路径是shade过的,这是符合预期的吗? @Becket
> >>
> >> Best,
> >> Kurt
> >>
> >>
> >> On Mon, Feb 25, 2019 at 9:56 AM 张洪涛  wrote:
> >>
> >> >
> >> >
> >> > sql-client.sh 的启动参数首先在classpath里面会包含kafka相关的jar  另外会有--jar
> >> > 包含所有connector的jar
> >> >
> >> >
> >> > 这些jars在sql-client提交job时候会上传到cluster的blob store 但是很奇怪为啥找不到
> >> >
> >> >
> >> >  00:00:06 /usr/lib/jvm/java-1.8.0-openjdk/bin/java
> >> > -Dlog.file=/bigdata/flink-1.5.1/log/flink-root-sql-client-gpu06.log
> >> >
> -Dlog4j.configuration=file:/bigdata/flink-1.5.1/conf/log4j-cli.properties
> >> > -Dlogback.configurationFile=file:/bigdata/flink-1.5.1/conf/logback.xml
> >> > -classpath
> >> >
> >>
> /bigdata/flink-1.5.1/lib/flink-python_2.11-1.5.1.jar:/bigdata/flink-1.5.1/lib/flink-shaded-hadoop2-uber-1.5.1.jar:/bigdata/flink-1.5.1/lib/log4j-1.2.17.jar:/bigdata/flink-1.5.1/lib/slf4j-log4j12-1.7.7.jar:/bigdata/flink-1.5.1/lib/flink-dist_2.11-1.5.1.jar::/bigdata/hadoop-2.7.5/etc/hadoop::/bigdata/flink-1.5.1/opt/connectors/kafka011/flink-connector-kafka-0.11_2.11-1.5.1-sql-jar.jar:/bigdata/flink-1.5.1/opt/connectors/kafka010/flink-connector-kafka-0.10_2.11-1.5.1-sql-jar.jar:/bigdata/flink-1.5.1/opt/connectors/kafka09/flink-connector-kafka-0.9_2.11-1.5.1-sql-jar.jar:/bigdata/flink-1.5.1/opt/connectors/kafka08/flink-connector-kafka-0.8_2.11-1.5.1.jar:/bigdata/flink-1.5.1/opt/connectors/flink-hbase_2.11-1.5.1.jar:/bigdata/flink-1.5.1/opt/connectors/flink-connector-hadoop-compatibility_2.11-1.5.1.jar:/bigdata/flink-1.5.1/opt/connectors/flink-connector-hive_2.11-1.5.1.jar:/bigdata/flink-1.5.1/opt/sql-client/datanucleus-api-jdo-4.2.4.jar:/bigdata/flink-1.5.1/opt/sql-client/javax.jdo-3.2.0-m3.jar:/bigdata/flink-1.5.1/opt/sql-client/datanucleus-core-4.1.17.jar:/bigdata/flink-1.5.1/opt/sql-client/datanucleus-rdbms-4.1.19.jar:/bigdata/flink-1.5.1/opt/sql-client/flink-sql-client-1.5.1.jar
> >> > org.apache.flink.table.client.SqlClient embedded -d
> >> > conf/sql-client-defaults.yaml --jar
> >> >
> >>
> /bigdata/flink-1.5.1/opt/connectors/kafka011/flink-connector-kafka-0.11_2.11-1.5.1-sql-jar.jar
> >> > --jar
> >> >
> >>
> /bigdata/flink-1.5.1/opt/connectors/kafka010/flink-connector-kafka-0.10_2.11-1.5.1-sql-jar.jar
> >> > --jar
> >> >
> >>
> /bigdata/flink-1.5.1/opt/connectors/kafka09/flink-connector-kafka-0.9_2.11-1.5.1-sql-jar.jar
> >> > --jar
> >> >
> >>
> /bigdata/flink-1.5.1/opt/connectors/kafka08/flink-connector-kafka-0.8_2.11-1.5.1.jar
> >> > --jar /bigdata/flink-1.5.1/opt/connectors/flink-hbase_2.11-1.5.1.jar
> >> --jar
> >> >
> >>
> /bigdata/flink-1.5.1/opt/connectors/flink-connector-hadoop-compatibility_2.11-1.5.1.jar
> >> > --jar
> >> >
> /bigdata/flink-1.5.1/opt/connectors/flink-connector-hive_2.11-1.5.1.jar
> >> > --jar 

Re: StreamingFileSink on EMR

2019-02-26 Thread Till Rohrmann
Hi Kevin,

could you check what's on the class path of the Flink cluster? You should
see this in the jobmanager.log at the top. It seems as if there is a Hadoop
dependency with a lower version. Flink 1.7 is build against which Hadoop
version? You should make sure that you either use the Hadoop-free version
of the version where the Hadoop version is >= 2.7. Not sure what option EMR
offers here.

Cheers,
Till

On Tue, Feb 26, 2019 at 12:23 AM Bohinski, Kevin (Contractor) <
kevin_bohin...@comcast.com> wrote:

> When running Flink 1.7 on EMR 5.21 using StreamingFileSink we see
> java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are
> only supported for HDFS and for Hadoop version 2.7 or newer. EMR is showing
> Hadoop version 2.8.5. Is anyone else seeing this issue?
>


Re: What are blobstore files and why do they keep filling up /tmp directory?

2019-02-26 Thread Till Rohrmann
Hi Harshith,

the blob store files are necessary to distribute the Flink job in your
cluster. After the job has been completed, they should be cleaned up. Only
in the case of cluster crashes the clean up should not happen. Since Flink
1.4.2 is no longer actively supported, I would suggest to upgrade to the
latest Flink version and to check whether the problem still occurs.

Cheers,
Till

On Tue, Feb 26, 2019 at 2:48 AM Kumar Bolar, Harshith 
wrote:

> Hi all,
>
>
>
> We're running Flink on a standalone five node cluster. The /tmp/ directory
> keeps filling with directories starting with blobstore--*. These
> directories are very large (approx 1 GB) and fill up the space very quickly
> and the jobs fail with a No space left of device error. The files in these
> directories appear to be some form of binary representation of the jobs
> that are running on the cluster.
>
> What are these files and how do I take care of cleaning them so they don't
> fill up /tmp/ causing jobs to fail?
>
> Flink version: 1.4.2
>
>
>
> Thanks,
>
> Harshith
>


Re: Collapsing watermarks after keyby

2019-02-26 Thread Till Rohrmann
Hi Padarn,

Flink does not generate watermarks per keys. Atm watermarks are always
global. Therefore, I would suspect that it is rather a problem with
generating watermarks at all. Could it be that your input data does not
span a period longer than 5 minutes and also does not terminate? Another
problem could be the CountTrigger which should not react to the window's
end time. The method onEventTime simply returns TriggerResult.CONTINUE and
I think this will cause the window to not fire. Maybe a working example
program with example input could be helpful for further debugging.

Cheers,
Till

On Tue, Feb 26, 2019 at 2:53 AM Padarn Wilson  wrote:

> Hi Flink Mailing List,
>
> Long story short - I want to somehow collapse watermarks at an operator
> across keys, so that keys with dragging watermarks do not drag behind.
> Details below:
>
> ---
>
> I have an application in which I want to perform the follow sequence of
> steps: Assume my data is made up of data that has: (time, user, location,
> action)
>
> -> Read source
> -> KeyBy (UserId, Location)
> -> EventTimeSessionWindow (5 min gap) - results in (User Location Session)
> -> TriggerOnFirst event
> -> KeyBy (Location)
> -> SlidingEventTimeWindow(5min length, 5 second gap)
> -> Count
>
> The end intention is to count the number of unique users in a given
> location - the EventTimeSessionWindow is used to make sure users are only
> counted once.
>
> So I created a custom Trigger, which is the same as CountTrigger, but has
> the following `TriggerResult" funtion:
>
> @Override
> public TriggerResult onElement(Object element, long timestamp, W window, 
> TriggerContext ctx) throws Exception {
>   ReducingState count = ctx.getPartitionedState(stateDesc);
>   count.add(1L);
>   if (count.get() == maxCount) {
> return TriggerResult.FIRE_AND_PURGE;
>   } else if (count.get() > maxCount) {
> return TriggerResult.PURGE;
>   }
>   return TriggerResult.CONTINUE;
>
> }
>
> But my final SlidingEventTimeWindow does not fire properly. This is
> because (I assume) there are some users with sessions windows that are not
> closed, and so the watermark for those keys is running behind and so the
> SlidingEventTimeWindow watermark is held back too.
>
> What I feel like I want to achieve is essentially setting the watermark of
> the SlidingEventTimeWindow operator to be the maximum (with lateness) of
> the input keys, rather than the minimum, but I cannot tell if this is
> possible, and if not, what another approach could be.
>
> Thanks,
> Padarn
>


Re: Flink window triggering and timing on connected streams

2019-02-26 Thread Till Rohrmann
Hi Andrew,

if using connected streams (e.g. CoFlatMapFunction or CoMapFunction), then
the watermarks will be synchronized across both inputs. Concretely, you
will always emit the minimum of the watermarks arriving on input channel 1
and 2. Take a look at AbstractStreamOperator.java:773-804.

Cheers,
Till

On Tue, Feb 26, 2019 at 4:27 AM Andrew Roberts  wrote:

> I’m not sure that approach will work for me, as I have many sessions going
> at the same time which can overlap. Also, I need to be able to have
> sessions time out if they never receive an end event. Do you know directly
> if setting a timer triggers when any timestamp passes that time, or when
> the watermark passes that time?
>
>
> On Feb 25, 2019, at 9:08 PM, Hequn Cheng  wrote:
>
> Hi Andrew,
>
> >  I have an “end session” event that I want to cause the window to fire
> and purge.
> Do you want to fire the window only by the 'end session' event? I see one
> option to solve the problem. You can use a tumbling window(say 5s) and set
> your timestamp to t‘+5s each time receiving an 'end session' event in your
> user-defined `AssignerWithPeriodicWatermarks`.
>
> > My understanding is that this is what the trailing watermark is for, and
> that in connected streams, the lowest (earliest) watermark of the input
> streams is what is seen as the watermark downstream.
> Yes, and we can make use of this to make window fires only on 'end
> session' event using the solution above.
>
> Best, Hequn
>
>
> On Tue, Feb 26, 2019 at 5:54 AM Andrew Roberts  wrote:
>
>> Hello,
>>
>> I’m trying to implement session windows over a set of connected streams
>> (event time), with some custom triggering behavior. Essentially, I allow
>> very long session gaps, but I have an “end session” event that I want to
>> cause the window to fire and purge. I’m assigning timestamps and watermarks
>> using BoundedOutOfOrdernessTimestampExtractor, with a 1 minute delay for
>> the watermark. I have things mostly wired up, but I have some confusion
>> about how I can ensure that my streams stay “in sync” relative to time.
>>
>>  Let’s say I am connecting streams A and B. Stream A is where the “end
>> session” event always comes from. If I have a session involving events from
>> time t to t’ in stream A, and then at t’ I get an “end session”, I want to
>> ensure that the window doesn’t fire until stream B has also processed
>> events (added events to the window) up to time t’. My understanding is that
>> this is what the trailing watermark is for, and that in connected streams,
>> the lowest (earliest) watermark of the input streams is what is seen as the
>> watermark downstream.
>>
>> Currently, I’m setting a timer for the current time + 1 when I see my
>> “end event”, with the idea that that timer will fire when the WATERMARK
>> passes that time, i.e., all streams have progressed at least as far as that
>> end event. However, the implementation of EventTimeTrigger doesn’t really
>> look like that’s what’s going on.
>>
>> Can anyone clear up how these concepts interact? Is there a good model
>> for this “session end event” concept that I can take a look at?
>>
>> Thanks,
>>
>> Andrew
>> --
>> *Confidentiality Notice: The information contained in this e-mail and any
>>
>> attachments may be confidential. If you are not an intended recipient, you
>>
>> are hereby notified that any dissemination, distribution or copying of
>> this
>>
>> e-mail is strictly prohibited. If you have received this e-mail in error,
>>
>> please notify the sender and permanently delete the e-mail and any
>>
>> attachments immediately. You should not retain, copy or use this e-mail or
>>
>> any attachment for any purpose, nor disclose all or any part of the
>>
>> contents to any other person. Thank you.*
>>
>
>
> *Confidentiality Notice: The information contained in this e-mail and any
> attachments may be confidential. If you are not an intended recipient, you
> are hereby notified that any dissemination, distribution or copying of this
> e-mail is strictly prohibited. If you have received this e-mail in error,
> please notify the sender and permanently delete the e-mail and any
> attachments immediately. You should not retain, copy or use this e-mail or
> any attachment for any purpose, nor disclose all or any part of the
> contents to any other person. Thank you.*