This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new c97813cd5 [lake/iceberg] Support pass hadoop configuration (#1541)
c97813cd5 is described below
commit c97813cd5e35cfdcd9b0c3d83cdb1918585a8569
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Aug 19 10:56:25 2025 +0800
[lake/iceberg] Support pass hadoop configuration (#1541)
---
fluss-lake/fluss-lake-iceberg/pom.xml | 27 +++++
.../fluss/lake/iceberg/IcebergLakeCatalog.java | 13 +-
.../fluss/lake/iceberg/conf/HadoopConfSerde.java | 48 ++++++++
.../fluss/lake/iceberg/conf/HadoopUtils.java | 135 +++++++++++++++++++++
.../lake/iceberg/conf/IcebergConfiguration.java | 98 +++++++++++++++
.../iceberg/tiering/IcebergCatalogProvider.java | 10 +-
.../src/main/resources/META-INF/NOTICE | 6 +
.../iceberg/conf/IcebergConfigurationTest.java | 70 +++++++++++
8 files changed, 393 insertions(+), 14 deletions(-)
diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml
b/fluss-lake/fluss-lake-iceberg/pom.xml
index 52f37d072..274de4919 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -33,6 +33,16 @@
<packaging>jar</packaging>
<dependencies>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-api</artifactId>
+ <version>${iceberg.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-common</artifactId>
+ <version>${iceberg.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
@@ -48,10 +58,21 @@
<artifactId>iceberg-parquet</artifactId>
<version>${iceberg.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-orc</artifactId>
+ <version>${iceberg.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-bundled-guava</artifactId>
+ <version>${iceberg.version}</version>
+ </dependency>
<dependency>
<groupId>com.alibaba.fluss</groupId>
<artifactId>fluss-common</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.fluss</groupId>
@@ -136,7 +157,13 @@
<configuration>
<artifactSet>
<includes>
+
<include>org.apache.iceberg:iceberg-api</include>
<include>org.apache.iceberg:iceberg-core</include>
+
<include>org.apache.iceberg:iceberg-data</include>
+
<include>org.apache.iceberg:iceberg-common</include>
+
<include>org.apache.iceberg:iceberg-parquet</include>
+
<include>org.apache.iceberg:iceberg-orc</include>
+
<include>org.apache.iceberg:iceberg-bundled-guava</include>
</includes>
</artifactSet>
<filters>
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
index dbc087c1f..46eb9f65d 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -20,6 +20,7 @@ package com.alibaba.fluss.lake.iceberg;
import com.alibaba.fluss.annotation.VisibleForTesting;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.TableAlreadyExistException;
+import com.alibaba.fluss.lake.iceberg.conf.IcebergConfiguration;
import com.alibaba.fluss.lake.lakestorage.LakeCatalog;
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TablePath;
@@ -53,6 +54,8 @@ import static
org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
/** An Iceberg implementation of {@link LakeCatalog}. */
public class IcebergLakeCatalog implements LakeCatalog {
+ public static final String ICEBERG_CATALOG_DEFAULT_NAME =
"fluss-iceberg-catalog";
+
private static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new
LinkedHashMap<>();
static {
@@ -81,15 +84,9 @@ public class IcebergLakeCatalog implements LakeCatalog {
private Catalog createIcebergCatalog(Configuration configuration) {
Map<String, String> icebergProps = configuration.toMap();
-
- String catalogName = icebergProps.getOrDefault("name",
"fluss-iceberg-catalog");
-
+ String catalogName = icebergProps.getOrDefault("name",
ICEBERG_CATALOG_DEFAULT_NAME);
return buildIcebergCatalog(
- catalogName,
- icebergProps, // todo: current is an empty configuration, need
to init from env or
- // fluss
- // configurations
- new org.apache.hadoop.conf.Configuration());
+ catalogName, icebergProps,
IcebergConfiguration.from(configuration).get());
}
@Override
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/HadoopConfSerde.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/HadoopConfSerde.java
new file mode 100644
index 000000000..ffb26c56d
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/HadoopConfSerde.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/** Serde of {@link Configuration} . */
+public class HadoopConfSerde {
+
+ public static void writeObject(ObjectOutputStream out, Object hadoopConf)
throws IOException {
+ try {
+ Configuration conf = (Configuration) hadoopConf;
+ conf.write(out);
+ } catch (IOException e) {
+ throw new IOException("Failed to serialize Hadoop Configuration: "
+ e.getMessage(), e);
+ }
+ }
+
+ public static Configuration readObject(ObjectInputStream in) throws
IOException {
+ try {
+ Configuration hadoopConf = new Configuration();
+ hadoopConf.readFields(in);
+ return hadoopConf;
+ } catch (IOException e) {
+ throw new IOException(
+ "Failed to deserialize Hadoop Configuration: " +
e.getMessage(), e);
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/HadoopUtils.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/HadoopUtils.java
new file mode 100644
index 000000000..f93cdd7df
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/HadoopUtils.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.conf;
+
+import com.alibaba.fluss.config.ConfigBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+/**
+ * Utility class for working with Hadoop-related classes. This should only be
used if Hadoop is on
+ * the classpath.
+ *
+ * <p>Note: Copied from HadoopUtils of fluss-fs-hadoop module
+ */
+public class HadoopUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HadoopUtils.class);
+
+ /** The prefixes that Fluss adds to the Hadoop config for iceberg. */
+ private static final String[] FLUSS_CONFIG_PREFIXES = {"iceberg.hadoop."};
+
+ public static Configuration getHadoopConfiguration(
+ com.alibaba.fluss.config.Configuration flussConfiguration) {
+
+ // Instantiate an HdfsConfiguration to load the hdfs-site.xml and
hdfs-default.xml
+ // from the classpath
+
+ Configuration result = new HdfsConfiguration();
+ boolean foundHadoopConfiguration = false;
+
+ // We need to load both core-site.xml and hdfs-site.xml to determine
the default fs path and
+ // the hdfs configuration.
+ // The properties of a newly added resource will override the ones in
previous resources, so
+ // a configuration
+ // file with higher priority should be added later.
+
+ // Approach 1: HADOOP_HOME environment variables
+ String[] possibleHadoopConfPaths = new String[2];
+
+ final String hadoopHome = System.getenv("HADOOP_HOME");
+ if (hadoopHome != null) {
+ LOG.debug("Searching Hadoop configuration files in HADOOP_HOME:
{}", hadoopHome);
+ possibleHadoopConfPaths[0] = hadoopHome + "/conf";
+ possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop
2.2
+ }
+
+ for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
+ if (possibleHadoopConfPath != null) {
+ foundHadoopConfiguration = addHadoopConfIfFound(result,
possibleHadoopConfPath);
+ }
+ }
+
+ // Approach 2: HADOOP_CONF_DIR environment variable
+ String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
+ if (hadoopConfDir != null) {
+ LOG.debug("Searching Hadoop configuration files in
HADOOP_CONF_DIR: {}", hadoopConfDir);
+ foundHadoopConfiguration =
+ addHadoopConfIfFound(result, hadoopConfDir) ||
foundHadoopConfiguration;
+ }
+
+ // Approach 3: Fluss configuration
+ // add all configuration key with prefix 'iceberg.hadoop.' in fluss
conf to hadoop conf
+ for (String key : flussConfiguration.keySet()) {
+ for (String prefix : FLUSS_CONFIG_PREFIXES) {
+ if (key.startsWith(prefix)) {
+ String newKey = key.substring(prefix.length());
+ String value =
+ flussConfiguration.getString(
+
ConfigBuilder.key(key).stringType().noDefaultValue(), null);
+ result.set(newKey, value);
+ LOG.debug(
+ "Adding Fluss config entry for {} as {}={} to
Hadoop config",
+ key,
+ newKey,
+ value);
+ foundHadoopConfiguration = true;
+ }
+ }
+ }
+
+ if (!foundHadoopConfiguration) {
+ LOG.warn(
+ "Could not find Hadoop configuration via any of the
supported methods "
+ + "(Fluss configuration, environment variables).");
+ }
+
+ return result;
+ }
+
+ /**
+ * Search Hadoop configuration files in the given path, and add them to
the configuration if
+ * found.
+ */
+ private static boolean addHadoopConfIfFound(
+ Configuration configuration, String possibleHadoopConfPath) {
+ boolean foundHadoopConfiguration = false;
+ if (new File(possibleHadoopConfPath).exists()) {
+ if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) {
+ configuration.addResource(
+ new org.apache.hadoop.fs.Path(possibleHadoopConfPath +
"/core-site.xml"));
+ LOG.debug(
+ "Adding {}/core-site.xml to hadoop configuration",
possibleHadoopConfPath);
+ foundHadoopConfiguration = true;
+ }
+ if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) {
+ configuration.addResource(
+ new org.apache.hadoop.fs.Path(possibleHadoopConfPath +
"/hdfs-site.xml"));
+ LOG.debug(
+ "Adding {}/hdfs-site.xml to hadoop configuration",
possibleHadoopConfPath);
+ foundHadoopConfiguration = true;
+ }
+ }
+ return foundHadoopConfiguration;
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/IcebergConfiguration.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/IcebergConfiguration.java
new file mode 100644
index 000000000..b2b277745
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/conf/IcebergConfiguration.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.conf;
+
+import com.alibaba.fluss.annotation.VisibleForTesting;
+import com.alibaba.fluss.config.Configuration;
+
+import org.apache.iceberg.common.DynClasses;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * Wraps the hadoop configuration used to configure {@link
org.apache.iceberg.catalog.Catalog} if
+ * hadoop related classes is available.
+ *
+ * <p>It don't declare Hadoop configuration explicitly for some catalogs won't
need hadoop
+ * configuration. For these catalogs, it won't throw class not found
exception. It set the conf to
+ * null if no hadoop dependencies are found. It's fine to use null for the
catalogs don't require
+ * Hadoop configuration.
+ *
+ * <p>For the catalogs require Hadoop configuration, hadoop related class not
found exception will
+ * be thrown which guides users to add hadoop related classes.
+ */
+public class IcebergConfiguration implements Serializable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergConfiguration.class);
+
+ private transient Object conf;
+
+ @VisibleForTesting
+ protected IcebergConfiguration(Object conf) {
+ this.conf = conf;
+ }
+
+ public static IcebergConfiguration from(Configuration flussConfig) {
+ return new IcebergConfiguration(loadHadoopConfig(flussConfig));
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ if (conf == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ HadoopConfSerde.writeObject(out, conf);
+ }
+ }
+
+ private void readObject(ObjectInputStream in) throws
ClassNotFoundException, IOException {
+ in.defaultReadObject();
+ boolean configIsNotNull = in.readBoolean();
+ if (configIsNotNull) {
+ conf = HadoopConfSerde.readObject(in);
+ } else {
+ conf = null;
+ }
+ }
+
+ private static Object loadHadoopConfig(Configuration flussConfig) {
+ Class<?> configClass =
+ DynClasses.builder()
+ .impl("org.apache.hadoop.hdfs.HdfsConfiguration")
+ .impl("org.apache.hadoop.conf.Configuration")
+ .orNull()
+ .build();
+
+ if (configClass == null) {
+ LOG.info("Hadoop not found on classpath, not creating Hadoop
config");
+ return null;
+ }
+
+ return HadoopUtils.getHadoopConfiguration(flussConfig);
+ }
+
+ public Object get() {
+ return conf;
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
index 1c484743a..b27f94e02 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
@@ -18,12 +18,14 @@
package com.alibaba.fluss.lake.iceberg.tiering;
import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.iceberg.conf.IcebergConfiguration;
import org.apache.iceberg.catalog.Catalog;
import java.io.Serializable;
import java.util.Map;
+import static
com.alibaba.fluss.lake.iceberg.IcebergLakeCatalog.ICEBERG_CATALOG_DEFAULT_NAME;
import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
/** A provider for Iceberg catalog. */
@@ -38,13 +40,9 @@ public class IcebergCatalogProvider implements Serializable {
public Catalog get() {
Map<String, String> icebergProps = icebergConfig.toMap();
- String catalogName = icebergProps.getOrDefault("name",
"fluss-iceberg-catalog");
+ String catalogName = icebergProps.getOrDefault("name",
ICEBERG_CATALOG_DEFAULT_NAME);
return buildIcebergCatalog(
- catalogName,
- icebergProps,
- // todo: current is an empty configuration, need to init from
env or fluss
- // configurations
- new org.apache.hadoop.conf.Configuration());
+ catalogName, icebergProps,
IcebergConfiguration.from(icebergConfig).get());
}
}
diff --git a/fluss-lake/fluss-lake-iceberg/src/main/resources/META-INF/NOTICE
b/fluss-lake/fluss-lake-iceberg/src/main/resources/META-INF/NOTICE
index 20b34f83d..edfd8ca2e 100644
--- a/fluss-lake/fluss-lake-iceberg/src/main/resources/META-INF/NOTICE
+++ b/fluss-lake/fluss-lake-iceberg/src/main/resources/META-INF/NOTICE
@@ -7,3 +7,9 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
- org.apache.iceberg:iceberg-core:1.4.3
+- org.apache.iceberg:iceberg-api:1.4.3
+- org.apache.iceberg:iceberg-bundled-guava:1.4.3
+- org.apache.iceberg:iceberg-common:1.4.3
+- org.apache.iceberg:iceberg-data:1.4.3
+- org.apache.iceberg:iceberg-orc:1.4.3
+- org.apache.iceberg:iceberg-parquet:1.4.3
\ No newline at end of file
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/conf/IcebergConfigurationTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/conf/IcebergConfigurationTest.java
new file mode 100644
index 000000000..749ae86ab
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/conf/IcebergConfigurationTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.conf;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** UT for {@link IcebergConfiguration}. */
+class IcebergConfigurationTest {
+
+ @Test
+ void testSerde() throws Exception {
+ // test when conf is null
+ IcebergConfiguration conf = new IcebergConfiguration(null);
+
+ byte[] data = serialize(conf);
+ IcebergConfiguration gotConf = deserialize(data);
+ assertThat(gotConf.get()).isNull();
+
+ // test when conf is not null
+ Configuration hadoopConf = new Configuration();
+ hadoopConf.set("k1", "v1");
+ hadoopConf.set("k2", "v2");
+ conf = new IcebergConfiguration(hadoopConf);
+ data = serialize(conf);
+ gotConf = deserialize(data);
+ Configuration gotHadoopConf = (Configuration) gotConf.get();
+ assertThat(gotHadoopConf.get("k1")).isEqualTo("v1");
+ assertThat(gotHadoopConf.get("k2")).isEqualTo("v2");
+ }
+
+ private byte[] serialize(IcebergConfiguration conf) throws IOException {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+ oos.writeObject(conf);
+ return bos.toByteArray();
+ }
+ }
+
+ private IcebergConfiguration deserialize(byte[] data) throws Exception {
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(data);
+ ObjectInputStream ois = new ObjectInputStream(bis)) {
+ return (IcebergConfiguration) ois.readObject();
+ }
+ }
+}