[jira] [Created] (FLINK-7618) Add BINARY supported in FlinkTypeFactory
sunjincheng created FLINK-7618: -- Summary: Add BINARY supported in FlinkTypeFactory Key: FLINK-7618 URL: https://issues.apache.org/jira/browse/FLINK-7618 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: sunjincheng Assignee: sunjincheng We will get the following exception when we deal with the BINARY type. {code} org.apache.flink.table.api.TableException: Type is not supported: BINARY at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:377) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:741) at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67) at org.apache.calcite.rex.RexCall.accept(RexCall.java:104) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754) at org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744) {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
Re: got Warn message - "the expected leader session ID did not equal the received leader session ID " when using LocalFlinkMiniCluster to interpret scala code
Hi XiangWei, the problem is that the LocalFlinkMiniCluster can no longer be used in combination with a RemoteExecutionEnvironment. The reason is that the LocalFlinkMiniCluster uses now an internal leader election service and assigns leader ids to its components. Since this is an internal service it is not possible to retrieve this information like it is the case with the ZooKeeper based leader election services. Long story short, the Flink Scala shell currently does not work with a LocalFlinkMiniCluster and would have to be fixed to work properly together with a local execution environment. Until then, I recommend starting a local standalone cluster and let the code run there. Cheers, Till On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang wrote: > dear all, > > *Below is the code i execute:* > > import java.io._ > import java.net.{URL, URLClassLoader} > import java.nio.charset.Charset > import java.util.Collections > import java.util.concurrent.atomic.AtomicBoolean > > import com.netease.atom.common.util.logging.Logging > import com.netease.atom.interpreter.Code.Code > import com.netease.atom.interpreter.{Code, Interpreter, > InterpreterResult, InterpreterUtils} > import io.netty.buffer._ > import org.apache.flink.api.scala.FlinkILoop > import org.apache.flink.client.CliFrontend > import org.apache.flink.client.cli.CliFrontendParser > import org.apache.flink.client.program.ClusterClient > import org.apache.flink.configuration.{QueryableStateOptions, > Configuration, ConfigConstants, GlobalConfiguration} > import org.apache.flink.runtime.akka.AkkaUtils > import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, > LocalFlinkMiniCluster} > > import scala.Console > import scala.beans.BeanProperty > import scala.collection.JavaConversions._ > import scala.collection.mutable > import scala.collection.mutable.{ArrayBuffer, ListBuffer} > import scala.runtime.AbstractFunction0 > import scala.tools.nsc.Settings > import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results} > > class FlinkInterpreter extends Interpreter { > private var bufferedReader: Option[BufferedReader] = None > private var jprintWriter: JPrintWriter = _ > private val config = new Configuration; > private var cluster: LocalFlinkMiniCluster = _ > @BeanProperty var imain: IMain = _ > @BeanProperty var flinkILoop: FlinkILoop = _ > private var out: ByteBufOutputStream = null > private var outBuf: ByteBuf = null > private var in: ByteBufInputStream = _ > private var isRunning: AtomicBoolean = new AtomicBoolean(false) > > override def isOpen: Boolean = { > isRunning.get() > } > > def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = { > config.toMap.toMap.foreach(println) > config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1) > config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) > config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1) > config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) > config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true) > config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) > val localCluster = new LocalFlinkMiniCluster(config, false) > localCluster.start(true) > val port = AkkaUtils.getAddress(localCluster. > jobManagerActorSystems.get.head).port > println(s"Starting local Flink cluster (host: localhost,port: ${ > localCluster.getLeaderRPCPort}).\n") > ("localhost", localCluster.getLeaderRPCPort, localCluster) > } > > > /** >* Start flink cluster and create interpreter >*/ > override def open: Unit = { > outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480) > out = new ByteBufOutputStream(outBuf) > in = new ByteBufInputStream(outBuf) > //val (host, port, yarnCluster) = > deployNewYarnCluster(YarnConfig(Option(1), > None, None, None, Option(1), None)) > val (host, port, localCluster) = startLocalMiniCluster() > this.cluster = localCluster > val conf = cluster.configuration > println(s"Connecting to Flink cluster (host:$host,port:$port)...") > flinkILoop = new FlinkILoop(host, port, conf, None) > val settings = new Settings() > settings.usejavacp.value = true > settings.Yreplsync.value = true > flinkILoop.settings_$eq(settings) > flinkILoop.createInterpreter() > imain = flinkILoop.intp > FlinkInterpreter.ourClassloader = imain.classLoader > val benv = flinkILoop.scalaBenv > val senv = flinkILoop.scalaSenv > benv.getConfig.disableSysoutLogging() > senv.getConfig.disableSysoutLogging() > // import libraries > imain.interpret("import scala.tools.nsc.io._") > //imain.interpret("import Properties.userHome") > imain.interpret("import scala.compat.Platform.EOL") > imain.interpret("import org.apache.flink.api.scala._") > imain.interpret("import org.apache.flink.api.common.functions._") > isRunning.set(true) > } > > override def i
[jira] [Created] (FLINK-7619) Improve abstraction in AbstractAsyncIOCallable to better fit
Stefan Richter created FLINK-7619: - Summary: Improve abstraction in AbstractAsyncIOCallable to better fit Key: FLINK-7619 URL: https://issues.apache.org/jira/browse/FLINK-7619 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Reporter: Stefan Richter Assignee: Stefan Richter Priority: Minor The abstraction of AbstractAsyncIOCallable does not fit to well with todays needs in checkpointing backends. Originally, backends were assumed to only open one stream that is managed by the abstraction. In fact, concrete implementations always extended that in practise. We can redo this in a way that more resources can be managed by the abstraction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7620) Supports custom optimization phase
godfrey he created FLINK-7620: - Summary: Supports custom optimization phase Key: FLINK-7620 URL: https://issues.apache.org/jira/browse/FLINK-7620 Project: Flink Issue Type: Bug Components: Table API & SQL Environment: Currently, the optimization phases are hardcode in {{BatchTableEnvironment}} and {{StreamTableEnvironment}}. It's better that user could define the optimization phases and the rules in each phase as needed. Reporter: godfrey he Assignee: godfrey he -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7621) Fix inconsistency of CaseSensitive Configuration
Ruidong Li created FLINK-7621: - Summary: Fix inconsistency of CaseSensitive Configuration Key: FLINK-7621 URL: https://issues.apache.org/jira/browse/FLINK-7621 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Ruidong Li Assignee: Ruidong Li The default case sensitive config of Calcite is {{LEX.java}} which is different from TableAPI -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7622) Respect local KPL queue size in FlinkKinesisProducer when adding records to KPL client
Tzu-Li (Gordon) Tai created FLINK-7622: -- Summary: Respect local KPL queue size in FlinkKinesisProducer when adding records to KPL client Key: FLINK-7622 URL: https://issues.apache.org/jira/browse/FLINK-7622 Project: Flink Issue Type: Improvement Components: Kinesis Connector Reporter: Tzu-Li (Gordon) Tai This issue was brought to discussion by [~sthm] offline. Currently, records are added to the Kinesis KPL producer client without checking the number of outstanding records within the local KPL queue. This manner is basically neglecting backpressure when producing to Kinesis through KPL, and can therefore exhaust system resources. We should respect {{producer.getOutstandingRecordsCount()}} as a measure of backpressure, and propagate backpressure upstream by blocking further sink invocations when some threshold of outstanding record count is exceeded. The recommended threshold [1] seems to be 10,000. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state
Aljoscha Krettek created FLINK-7623: --- Summary: Detecting whether an operator is restored doesn't work with chained state Key: FLINK-7623 URL: https://issues.apache.org/jira/browse/FLINK-7623 Project: Flink Issue Type: Bug Components: DataStream API, State Backends, Checkpointing Affects Versions: 1.3.2, 1.4.0 Reporter: Aljoscha Krettek Priority: Blocker Fix For: 1.4.0, 1.3.3 Originally reported on the ML: https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E If we have a chain of operators where multiple of them have operator state, detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) does not work correctly. It's best exemplified using this minimal example where both the source and the flatMap have state: {code} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env .addSource(new MaSource()).uid("source-1") .flatMap(new MaFlatMap()).uid("flatMap-1"); env.execute("testing"); {code} If I do a savepoint with these UIDs, then change "source-1" to "source-2" and restore from the savepoint {{context.isRestored()}} still reports {{true}} for the source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7624) Add kafka-topic for "KafkaProducer" metrics
Hai Zhou created FLINK-7624: --- Summary: Add kafka-topic for "KafkaProducer" metrics Key: FLINK-7624 URL: https://issues.apache.org/jira/browse/FLINK-7624 Project: Flink Issue Type: Bug Components: Metrics Reporter: Hai Zhou Assignee: Hai Zhou Fix For: 1.4.0 Currently, metric in "KafkaProducer" MetricGroup, Such as: {code:java} localhost.taskmanager.dc4092a96ea4e54ecdbd13b9a5c209b2.Flink Streaming Job.Sink--MTKafkaProducer08.0.KafkaProducer.record-queue-time-avg {code} The metric name in the "KafkaProducer" group does not have a kafka-topic name part, if the job writes data to two different kafka sinks, these metrics will not distinguish. I wish that modify the above metric name as follows: {code:java} localhost.taskmanager.dc4092a96ea4e54ecdbd13b9a5c209b2.Flink Streaming Job.Sink--MTKafkaProducer08.0.KafkaProducer..record-queue-time-avg {code} Best, Hai Zhou -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7625) typo in docs metrics sections
Hai Zhou created FLINK-7625: --- Summary: typo in docs metrics sections Key: FLINK-7625 URL: https://issues.apache.org/jira/browse/FLINK-7625 Project: Flink Issue Type: Bug Components: Documentation, Metrics Affects Versions: 1.3.2 Reporter: Hai Zhou Assignee: Hai Zhou Fix For: 1.4.0, 1.3.3 InfixMetrics Status.JVM.Memory*Memory.Heap.Used* changed to Status.JVM.Memory *Heap.Used* -- This message was sent by Atlassian JIRA (v6.4.14#64029)
Re: Question about Flink's savepoint
Hi, What is the source you're using in your Job and what filesystem (if any) is it writing to? Best, Aljoscha > On 5. Sep 2017, at 03:06, Mu Kong wrote: > > Hi all, > > I have some questions about the experience I had with the save point. > So, last night I found my flink cluster's memory usage seemed wired, so I > decided to > > 1. create a savepoint for the running job(there was only one job running at > the time) > 2. and then cancel the job from web UI > 3. and restart the cluster > > and when I tried to resume the job with the savepoint, there was a > "Truncate did not truncate to right length. Should be 11757 is 56383." > exception. > Because there is also a savepoint being created every 4 a.m. in the > morning, so after I failed to run the job with the savepoint I created > before I canceled the job, I tried to use the 4 a.m. savepoint instead, and > it seemed to work well. > > Then this morning, I noticed there is data lost for the time after I cancel > the job and before I resume the job. > > I thought if I run the job with savepoint created in 4 a.m., it should > start to process data from 4 a.m., or I'm missing something here? > > Also, I didn't add uid to the addSource() function, maybe when I restarted > the cluster the auto-generated id has been changed and that might be the > reason why the recovery didn't go well?
[jira] [Created] (FLINK-7626) Add some metric description about checkpoints
Hai Zhou created FLINK-7626: --- Summary: Add some metric description about checkpoints Key: FLINK-7626 URL: https://issues.apache.org/jira/browse/FLINK-7626 Project: Flink Issue Type: Bug Components: Documentation, Metrics Affects Versions: 1.3.2 Reporter: Hai Zhou Assignee: Hai Zhou Fix For: 1.4.0, 1.3.3 Add some metric description in"Debugging & Monitoring / Metrics" part of docs: {noformat} //Number of total checkpoints (in progress, completed, failed) totalNumberOfCheckpoints //Number of in progress checkpoints. numberOfInProgressCheckpoints //Number of successfully completed checkpoints numberOfCompletedCheckpoints //Number of failed checkpoints. numberOfFailedCheckpoints //Timestamp when the checkpoint was restored at the coordinator. lastCheckpointRestoreTimestamp //Buffered bytes during alignment over all subtasks. lastCheckpointAlignmentBuffered {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7627) SingleElementIterable should implement with Serializable
Hequn Cheng created FLINK-7627: -- Summary: SingleElementIterable should implement with Serializable Key: FLINK-7627 URL: https://issues.apache.org/jira/browse/FLINK-7627 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Hequn Cheng Assignee: Hequn Cheng {{SingleElementIterable}} is used to merge accumulators and it should be serializable considering that it will be serialized when doing checkpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)