This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new fa2a132dec [ZEPPELIN-5874] polish flink code base (#4553) fa2a132dec is described below commit fa2a132dec28bb1350b8d95ea005b210081b0e70 Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Mon Jan 16 09:06:07 2023 +0100 [ZEPPELIN-5874] polish flink code base (#4553) --- .../flink/ApplicationModeExecutionEnvironment.java | 1 - .../apache/zeppelin/flink/FlinkInterpreter.java | 6 ++-- .../apache/zeppelin/flink/FlinkSqlInterpreter.java | 2 +- .../apache/zeppelin/flink/IPyFlinkInterpreter.java | 4 +-- .../java/org/apache/zeppelin/flink/JobManager.java | 34 +++++++++++----------- .../apache/zeppelin/flink/PyFlinkInterpreter.java | 2 +- .../org/apache/zeppelin/flink/TableEnvFactory.java | 24 +++++++-------- .../apache/zeppelin/flink/internal/JarHelper.java | 12 +++----- .../flink/internal/ScalaShellEnvironment.java | 3 -- .../zeppelin/flink/sql/AbstractStreamSqlJob.java | 16 +++++----- .../zeppelin/flink/sql/AppendStreamSqlJob.java | 7 ++--- .../zeppelin/flink/sql/SingleRowStreamSqlJob.java | 10 +++---- .../zeppelin/flink/sql/UpdateStreamSqlJob.java | 12 ++++---- .../apache/zeppelin/spark/PySparkInterpreter.java | 1 + .../apache/zeppelin/spark/SparkInterpreter.java | 9 +++--- .../interpreter/SingleRowInterpreterResult.java | 4 +-- .../SingleRowInterpreterResultTest.java | 9 +++--- 17 files changed, 75 insertions(+), 81 deletions(-) diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java index 0a96734e86..52ba6fe033 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/ApplicationModeExecutionEnvironment.java @@ -25,7 +25,6 @@ import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.PipelineExecutorServiceLoader; -import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.zeppelin.flink.internal.FlinkILoop; import java.io.File; diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index 63c69a004d..a10128f94c 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -53,7 +53,7 @@ public class FlinkInterpreter extends Interpreter { private String extractScalaVersion() throws InterpreterException { String scalaVersionString = scala.util.Properties.versionString(); - LOGGER.info("Using Scala: " + scalaVersionString); + LOGGER.info("Using Scala: {}", scalaVersionString); if (scalaVersionString.contains("version 2.11")) { return "2.11"; } else if (scalaVersionString.contains("version 2.12")) { @@ -87,7 +87,7 @@ public class FlinkInterpreter extends Interpreter { String scalaVersion = extractScalaVersion(); ClassLoader flinkScalaClassLoader = FlinkScalaInterpreter.class.getClassLoader(); String innerIntpClassName = innerInterpreterClassMap.get(scalaVersion); - Class clazz = Class.forName(innerIntpClassName); + Class<?> clazz = Class.forName(innerIntpClassName); return (FlinkScalaInterpreter) clazz.getConstructor(Properties.class, URLClassLoader.class) @@ -104,7 +104,7 @@ public class FlinkInterpreter extends Interpreter { @Override public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException { - LOGGER.debug("Interpret code: " + st); + LOGGER.debug("Interpret code: {}", st); this.z.setInterpreterContext(context); this.z.setGui(context.getGui()); this.z.setNoteGui(context.getNoteGui()); diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java index 8c55765a86..1145da4dda 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java @@ -46,7 +46,7 @@ public abstract class FlinkSqlInterpreter extends AbstractInterpreter { @Override protected InterpreterResult internalInterpret(String st, InterpreterContext context) throws InterpreterException { - LOGGER.debug("Interpret code: " + st); + LOGGER.debug("Interpret code: {}", st); // set ClassLoader of current Thread to be the ClassLoader of Flink scala-shell, // otherwise codegen will fail to find classes defined in scala-shell ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader(); diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java index 817d13fa3e..1bc61821f8 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/IPyFlinkInterpreter.java @@ -101,7 +101,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter { InterpreterResult result = super.internalInterpret("intp.resetClassLoaderInPythonThread()", context); if (result.code() != InterpreterResult.Code.SUCCESS) { - LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString()); + LOGGER.warn("Fail to resetClassLoaderInPythonThread: {}", result); } } } @@ -112,7 +112,7 @@ public class IPyFlinkInterpreter extends IPythonInterpreter { flinkInterpreter.cancel(context); super.cancel(context); } - + /** * Called by python process. */ diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java index 33a75dac40..61fdf4db80 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/JobManager.java @@ -38,7 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class JobManager { - private static Logger LOGGER = LoggerFactory.getLogger(JobManager.class); + private static final Logger LOGGER = LoggerFactory.getLogger(JobManager.class); public static final String LATEST_CHECKPOINT_PATH = "latest_checkpoint_path"; public static final String SAVEPOINT_PATH = "savepoint_path"; public static final String RESUME_FROM_SAVEPOINT = "resumeFromSavepoint"; @@ -62,7 +62,7 @@ public class JobManager { LOGGER.info("Creating JobManager at flinkWebUrl: {}, displayedFlinkWebUrl: {}", flinkWebUrl, displayedFlinkWebUrl); } - + public void addJob(InterpreterContext context, JobClient jobClient) { String paragraphId = context.getParagraphId(); JobClient previousJobClient = this.jobs.put(paragraphId, jobClient); @@ -83,18 +83,18 @@ public class JobManager { } public void removeJob(String paragraphId) { - LOGGER.info("Remove job in paragraph: " + paragraphId); + LOGGER.info("Remove job in paragraph: {}", paragraphId); JobClient jobClient = this.jobs.remove(paragraphId); if (jobClient == null) { - LOGGER.warn("Unable to remove job, because no job is associated with paragraph: " - + paragraphId); + LOGGER.warn("Unable to remove job, because no job is associated with paragraph: {}", + paragraphId); return; } FlinkJobProgressPoller jobProgressPoller = this.jobProgressPollerMap.remove(jobClient.getJobID()); if (jobProgressPoller == null) { - LOGGER.warn("Unable to remove poller, because no poller is associated with paragraph: " - + paragraphId); + LOGGER.warn("Unable to remove poller, because no poller is associated with paragraph: {}", + paragraphId); return; } @@ -114,21 +114,21 @@ public class JobManager { infos.put("paraId", context.getParagraphId()); context.getIntpEventClient().onParaInfosReceived(infos); } else { - LOGGER.warn("No job is associated with paragraph: " + context.getParagraphId()); + LOGGER.warn("No job is associated with paragraph: {}", context.getParagraphId()); } } public int getJobProgress(String paragraphId) { JobClient jobClient = this.jobs.get(paragraphId); if (jobClient == null) { - LOGGER.warn("Unable to get job progress for paragraph: " + paragraphId + - ", because no job is associated with this paragraph"); + LOGGER.warn("Unable to get job progress for paragraph: {}" + + ", because no job is associated with this paragraph", paragraphId); return 0; } FlinkJobProgressPoller jobProgressPoller = this.jobProgressPollerMap.get(jobClient.getJobID()); if (jobProgressPoller == null) { - LOGGER.warn("Unable to get job progress for paragraph: " + paragraphId + - ", because no job progress is associated with this jobId: " + jobClient.getJobID()); + LOGGER.warn("Unable to get job progress for paragraph: {}" + + ", because no job progress is associated with this jobId: {}", paragraphId, jobClient.getJobID()); return 0; } return jobProgressPoller.getProgress(); @@ -174,8 +174,8 @@ public class JobManager { throw new InterpreterException(errorMessage, e); } finally { if (cancelled) { - LOGGER.info("Cancelling is successful, remove the associated FlinkJobProgressPoller of paragraph: " - + context.getParagraphId()); + LOGGER.info("Cancelling is successful, remove the associated FlinkJobProgressPoller of paragraph: {}", + context.getParagraphId()); FlinkJobProgressPoller jobProgressPoller = jobProgressPollerMap.remove(jobClient.getJobID()); if (jobProgressPoller != null) { jobProgressPoller.cancel(); @@ -231,11 +231,11 @@ public class JobManager { totalTasks += vertex.getInt("parallelism"); finishedTasks += vertex.getJSONObject("tasks").getInt("FINISHED"); } - LOGGER.debug("Total tasks:" + totalTasks); - LOGGER.debug("Finished tasks:" + finishedTasks); + LOGGER.debug("Total tasks:{}", totalTasks); + LOGGER.debug("Finished tasks:{}", finishedTasks); if (finishedTasks != 0) { this.progress = finishedTasks * 100 / totalTasks; - LOGGER.debug("Progress: " + this.progress); + LOGGER.debug("Progress: {}", this.progress); } String jobState = rootNode.getObject().getString("state"); if (jobState.equalsIgnoreCase("finished")) { diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java index 3a33cd7c4b..df203b71b8 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/PyFlinkInterpreter.java @@ -123,7 +123,7 @@ public class PyFlinkInterpreter extends PythonInterpreter { if (useIPython() || (!useIPython() && getPythonProcessLauncher().isRunning())) { InterpreterResult result = super.interpret("intp.resetClassLoaderInPythonThread()", context); if (result.code() != InterpreterResult.Code.SUCCESS) { - LOGGER.warn("Fail to resetClassLoaderInPythonThread: " + result.toString()); + LOGGER.warn("Fail to resetClassLoaderInPythonThread: {}", result); } } } diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java index 711993578d..5ec2de96eb 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/TableEnvFactory.java @@ -44,7 +44,7 @@ import java.lang.reflect.Constructor; */ public class TableEnvFactory { - private static Logger LOGGER = LoggerFactory.getLogger(TableEnvFactory.class); + private static final Logger LOGGER = LoggerFactory.getLogger(TableEnvFactory.class); private FlinkVersion flinkVersion; private FlinkShims flinkShims; @@ -99,10 +99,10 @@ public class TableEnvFactory { public TableEnvironment createScalaFlinkBatchTableEnvironment() { try { - Class clazz = Class + Class<?> clazz = Class .forName("org.apache.flink.table.api.bridge.scala.internal.BatchTableEnvironmentImpl"); - Constructor constructor = clazz + Constructor<?> constructor = clazz .getConstructor( org.apache.flink.api.scala.ExecutionEnvironment.class, TableConfig.class, @@ -121,7 +121,7 @@ public class TableEnvFactory { Class<?> clazz = Class .forName("org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl"); - Constructor con = clazz.getConstructor( + Constructor<?> con = clazz.getConstructor( ExecutionEnvironment.class, TableConfig.class, CatalogManager.class, @@ -146,10 +146,10 @@ public class TableEnvFactory { Planner planner = (Planner) pair.left; Executor executor = (Executor) pair.right; - Class clazz = Class + Class<?> clazz = Class .forName("org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl"); try { - Constructor constructor = clazz + Constructor<?> constructor = clazz .getConstructor( CatalogManager.class, ModuleManager.class, @@ -169,7 +169,7 @@ public class TableEnvFactory { settings.isStreamingMode()); } catch (NoSuchMethodException e) { // Flink 1.11.1 change the constructor signature, FLINK-18419 - Constructor constructor = clazz + Constructor<?> constructor = clazz .getConstructor( CatalogManager.class, ModuleManager.class, @@ -203,11 +203,11 @@ public class TableEnvFactory { Planner planner = (Planner) pair.left; Executor executor = (Executor) pair.right; - Class clazz = Class + Class<?> clazz = Class .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl"); try { - Constructor constructor = clazz + Constructor<?> constructor = clazz .getConstructor( CatalogManager.class, ModuleManager.class, @@ -262,10 +262,10 @@ public class TableEnvFactory { Planner planner = (Planner) pair.left; Executor executor = (Executor) pair.right; - Class clazz = Class + Class<?> clazz = Class .forName("org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl"); try { - Constructor constructor = clazz.getConstructor( + Constructor<?> constructor = clazz.getConstructor( CatalogManager.class, ModuleManager.class, FunctionCatalog.class, @@ -285,7 +285,7 @@ public class TableEnvFactory { settings.isStreamingMode()); } catch (NoSuchMethodException e) { // Flink 1.11.1 change the constructor signature, FLINK-18419 - Constructor constructor = clazz.getConstructor( + Constructor<?> constructor = clazz.getConstructor( CatalogManager.class, ModuleManager.class, FunctionCatalog.class, diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/JarHelper.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/JarHelper.java index 07b5033238..648fe51e0f 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/JarHelper.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/JarHelper.java @@ -72,16 +72,13 @@ public class JarHelper { } mDestJarName = destJar.getCanonicalPath(); - FileOutputStream fout = new FileOutputStream(destJar); - JarOutputStream jout = new JarOutputStream(fout); - // jout.setLevel(0); - try { + try ( + FileOutputStream fout = new FileOutputStream(destJar); + JarOutputStream jout = new JarOutputStream(fout)) { + // jout.setLevel(0); jarDir(dirOrFile2Jar, jout, null); } catch (IOException ioe) { throw ioe; - } finally { - jout.close(); - fout.close(); } } @@ -89,7 +86,6 @@ public class JarHelper { * Unjars a given jar file into a given directory. */ public void unjarDir(File jarFile, File destDir) throws IOException { - BufferedOutputStream dest = null; FileInputStream fis = new FileInputStream(jarFile); unjar(fis, destDir); } diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellEnvironment.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellEnvironment.java index ebd9f1d2fb..c8288b96f4 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellEnvironment.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/internal/ScalaShellEnvironment.java @@ -19,11 +19,9 @@ package org.apache.zeppelin.flink.internal; -import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.ConfigUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.util.JarUtils; @@ -34,7 +32,6 @@ import java.util.ArrayList; import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * This class is copied from flink project, the reason is that flink scala shell only supports diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java index 1ee26143cc..e1e1642cbd 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java @@ -117,7 +117,7 @@ public abstract class AbstractStreamSqlJob { this.schema = removeTimeAttributes(flinkShims, table.getSchema()); checkTableSchema(schema); - LOGGER.info("ResultTable Schema: " + this.schema); + LOGGER.info("ResultTable Schema: {}", this.schema); final RowTypeInfo outputType = new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames()); @@ -132,8 +132,8 @@ public abstract class AbstractStreamSqlJob { serializer); // create table sink // pass binding address and port such that sink knows where to send to - LOGGER.debug("Collecting data at address: " + iterator.getBindAddress() + - ":" + iterator.getPort()); + LOGGER.debug("Collecting data at address: {}:{}", + iterator.getBindAddress(), iterator.getPort()); RetractStreamTableSink collectTableSink = (RetractStreamTableSink) flinkShims.getCollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer); // new CollectStreamTableSink(iterator.getBindAddress(), iterator.getPort(), serializer); @@ -149,16 +149,16 @@ public abstract class AbstractStreamSqlJob { ResultRetrievalThread retrievalThread = new ResultRetrievalThread(refreshScheduler); retrievalThread.start(); - LOGGER.info("Run job: " + tableName + ", parallelism: " + parallelism); + LOGGER.info("Run job: {}, parallelism: {}", tableName, parallelism); String jobName = context.getStringLocalProperty("jobName", tableName); table.executeInsert(tableName).await(); - LOGGER.info("Flink Job is finished, jobName: " + jobName); + LOGGER.info("Flink Job is finished, jobName: {}", jobName); // wait for retrieve thread consume all data LOGGER.info("Waiting for retrieve thread to be done"); retrievalThread.join(); refresh(context); String finalResult = buildResult(); - LOGGER.info("Final Result: " + finalResult); + LOGGER.info("Final Result: {}", finalResult); return finalResult; } catch (Exception e) { LOGGER.error("Fail to run stream sql job", e); @@ -229,7 +229,7 @@ public abstract class AbstractStreamSqlJob { isRunning = false; LOGGER.info("ResultRetrieval Thread is done, isRunning={}, hasNext={}", isRunning, iterator.hasNext()); - LOGGER.info("Final Result: " + buildResult()); + LOGGER.info("Final Result: {}", buildResult()); refreshExecutorService.shutdownNow(); } @@ -255,7 +255,7 @@ public abstract class AbstractStreamSqlJob { if (!enableToRefresh) { resultLock.wait(); } - LOGGER.debug("Refresh result of paragraph: " + context.getParagraphId()); + LOGGER.debug("Refresh result of paragraph: {}", context.getParagraphId()); refresh(context); } } catch (Exception e) { diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java index c6791ea6d3..705f9979b8 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/AppendStreamSqlJob.java @@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.sql.Timestamp; -import java.time.temporal.TemporalField; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -70,7 +69,7 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob { @Override protected void processInsert(Row row) { - LOGGER.debug("processInsert: " + row.toString()); + LOGGER.debug("processInsert: {}", row.toString()); materializedTable.add(row); } @@ -99,7 +98,7 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob { return f1.compareTo(f2); }); - if (materializedTable.size() != 0) { + if (!materializedTable.isEmpty()) { // Timestamp type before/after Flink 1.14 has changed. if (flinkShims.getFlinkVersion().isAfterFlink114()) { java.time.LocalDateTime ldt = ((java.time.LocalDateTime) materializedTable @@ -133,7 +132,7 @@ public class AppendStreamSqlJob extends AbstractStreamSqlJob { String result = buildResult(); context.out.write(result); context.out.flush(); - LOGGER.debug("Refresh with data: " + result); + LOGGER.debug("Refresh with data: {}", result); } catch (IOException e) { LOGGER.error("Fail to refresh data", e); } diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java index 902ff42889..5f81e89549 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/SingleRowStreamSqlJob.java @@ -25,7 +25,6 @@ import org.apache.zeppelin.flink.FlinkShims; import org.apache.zeppelin.flink.JobManager; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.SingleRowInterpreterResult; -import org.apache.zeppelin.tabledata.TableDataUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +34,7 @@ import java.util.List; public class SingleRowStreamSqlJob extends AbstractStreamSqlJob { - private static Logger LOGGER = LoggerFactory.getLogger(SingleRowStreamSqlJob.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SingleRowStreamSqlJob.class); private Row latestRow; private String template; @@ -56,8 +55,9 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob { return "single"; } + @Override protected void processInsert(Row row) { - LOGGER.debug("processInsert: " + row.toString()); + LOGGER.debug("processInsert: {}", row); latestRow = row; } @@ -95,8 +95,8 @@ public class SingleRowStreamSqlJob extends AbstractStreamSqlJob { singleRowResult.pushAngularObjects(); } - private List rowToList(Row row) { - List list = new ArrayList<>(); + private List<Object> rowToList(Row row) { + List<Object> list = new ArrayList<>(); for (int i = 0; i < row.getArity(); i++) { list.add(row.getField(i)); } diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java index 44105b9a74..9a7c8b436c 100644 --- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java +++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/sql/UpdateStreamSqlJob.java @@ -35,7 +35,7 @@ import java.util.List; public class UpdateStreamSqlJob extends AbstractStreamSqlJob { - private static Logger LOGGER = LoggerFactory.getLogger(UpdateStreamSqlJob.class); + private static final Logger LOGGER = LoggerFactory.getLogger(UpdateStreamSqlJob.class); private List<Row> materializedTable = new ArrayList<>(); private List<Row> lastSnapshot = new ArrayList<>(); @@ -54,19 +54,21 @@ public class UpdateStreamSqlJob extends AbstractStreamSqlJob { return "retract"; } + @Override protected void processInsert(Row row) { enableToRefresh = true; resultLock.notify(); - LOGGER.debug("processInsert: " + row.toString()); + LOGGER.debug("processInsert: {}", row); materializedTable.add(row); } + @Override protected void processDelete(Row row) { enableToRefresh = false; - LOGGER.debug("processDelete: " + row.toString()); + LOGGER.debug("processDelete: {}", row); for (int i = 0; i < materializedTable.size(); i++) { if (flinkShims.rowEquals(materializedTable.get(i), row)) { - LOGGER.debug("real processDelete: " + row.toString()); + LOGGER.debug("real processDelete: {}", row); materializedTable.remove(i); break; } @@ -103,7 +105,7 @@ public class UpdateStreamSqlJob extends AbstractStreamSqlJob { String result = buildResult(); context.out.write(result); context.out.flush(); - LOGGER.debug("Refresh with data: " + result); + LOGGER.debug("Refresh with data: {}", result); this.lastSnapshot.clear(); for (Row row : materializedTable) { this.lastSnapshot.add(row); diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 737bef8f4b..ee891a62d4 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -192,6 +192,7 @@ public class PySparkInterpreter extends PythonInterpreter { return "python"; } + @Override public ZeppelinContext getZeppelinContext() { if (sparkInterpreter != null) { return sparkInterpreter.getZeppelinContext(); diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 035924e603..62dfb1dd1a 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -68,7 +68,7 @@ public class SparkInterpreter extends AbstractInterpreter { } private static AtomicInteger SESSION_NUM = new AtomicInteger(0); - private static Class innerInterpreterClazz; + private static Class<?> innerInterpreterClazz; private AbstractSparkScalaInterpreter innerInterpreter; private Map<String, String> innerInterpreterClassMap = new HashMap<>(); private SparkContext sc; @@ -171,8 +171,8 @@ public class SparkInterpreter extends AbstractInterpreter { File scalaJarFolder = new File(zeppelinHome + "/interpreter/spark/scala-" + scalaVersion); List<URL> urls = new ArrayList<>(); for (File file : scalaJarFolder.listFiles()) { - LOGGER.debug("Add file " + file.getAbsolutePath() + " to classpath of spark scala interpreter: " - + scalaJarFolder); + LOGGER.debug("Add file {} to classpath of spark scala interpreter: {}", file.getAbsolutePath(), + scalaJarFolder); urls.add(file.toURI().toURL()); } scalaInterpreterClassLoader = new URLClassLoader(urls.toArray(new URL[0]), @@ -232,6 +232,7 @@ public class SparkInterpreter extends AbstractInterpreter { return innerInterpreter.getProgress(context); } + @Override public ZeppelinContext getZeppelinContext() { if (this.innerInterpreter == null) { throw new RuntimeException("innerInterpreterContext is null"); @@ -276,7 +277,7 @@ public class SparkInterpreter extends AbstractInterpreter { } else { scalaVersionString = scala.util.Properties.versionString(); } - LOGGER.info("Using Scala: " + scalaVersionString); + LOGGER.info("Using Scala: {}", scalaVersionString); if (StringUtils.isEmpty(scalaVersionString)) { throw new InterpreterException("Scala Version is empty"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResult.java index db975b2964..e83f8290eb 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResult.java @@ -29,10 +29,10 @@ import java.util.List; public class SingleRowInterpreterResult { private String template; - private List values; + private List<Object> values; private InterpreterContext context; - public SingleRowInterpreterResult(List values, String template, InterpreterContext context) { + public SingleRowInterpreterResult(List<Object> values, String template, InterpreterContext context) { this.values = values; this.template = template; this.context = context; diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResultTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResultTest.java index b169bc0350..a83c685fa6 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResultTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/SingleRowInterpreterResultTest.java @@ -17,19 +17,18 @@ package org.apache.zeppelin.interpreter; -import org.junit.Test; +import static org.junit.Assert.assertEquals; -import java.io.Serializable; import java.util.Arrays; import java.util.List; -import static org.junit.Assert.assertEquals; +import org.junit.Test; public class SingleRowInterpreterResultTest { @Test public void testHtml() { - List<Serializable> list = Arrays.asList("2020-01-01", 10); + List<Object> list = Arrays.asList("2020-01-01", 10); String template = "Total count:{1} for {0}"; InterpreterContext context = InterpreterContext.builder().build(); SingleRowInterpreterResult singleRowInterpreterResult = new SingleRowInterpreterResult(list, template, context); @@ -39,7 +38,7 @@ public class SingleRowInterpreterResultTest { @Test public void testAngular() { - List<Serializable> list = Arrays.asList("2020-01-01", 10); + List<Object> list = Arrays.asList("2020-01-01", 10); String template = "Total count:{1} for {0}"; InterpreterContext context = InterpreterContext.builder().build(); SingleRowInterpreterResult singleRowInterpreterResult = new SingleRowInterpreterResult(list, template, context);