[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...

2017-12-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/carbondata/pull/1638


---


[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...

2017-12-13 Thread QiangCai
Github user QiangCai commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1638#discussion_r156832490
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
 ---
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.DataCommand
+
+import org.apache.carbondata.streaming.segment.StreamSegment
+
+/**
+ * This command will try to change the status of the segment from 
"streaming" to "streaming finish"
+ */
+case class CarbonAlterTableFinishStreaming(
+dbName: Option[String],
+tableName: String)
+  extends DataCommand {
--- End diff --

fixed


---


[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...

2017-12-13 Thread QiangCai
Github user QiangCai commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1638#discussion_r156834294
  
--- Diff: 
streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
 ---
@@ -180,6 +182,70 @@ public static String close(CarbonTable table, String 
segmentId)
 }
   }
 
+  /**
+   * change the status of the segment from "streaming" to "streaming 
finish"
+   */
+  public static void finishStreaming(CarbonTable carbonTable) throws 
Exception {
+ICarbonLock lock = CarbonLockFactory.getCarbonLockObj(
+carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
+LockUsage.TABLE_STATUS_LOCK);
+try {
+  if (lock.lockWithRetries()) {
+ICarbonLock streamingLock = CarbonLockFactory.getCarbonLockObj(
+
carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
+LockUsage.STREAMING_LOCK);
+try {
+  if (streamingLock.lockWithRetries()) {
+LoadMetadataDetails[] details =
+
SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath());
+boolean updated = false;
+for (LoadMetadataDetails detail : details) {
+  if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
+detail.setLoadEndTime(System.currentTimeMillis());
+detail.setSegmentStatus(SegmentStatus.STREAMING_FINISH);
+updated = true;
+  }
+}
+if (updated) {
+  CarbonTablePath tablePath =
+  
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
+  SegmentStatusManager.writeLoadDetailsIntoFile(
+  tablePath.getTableStatusFilePath(), details);
+}
+  } else {
+String msg = "Failed to finish streaming, because streaming is 
locked for table " +
+carbonTable.getDatabaseName() + "." + 
carbonTable.getTableName();
+LOGGER.error(msg);
+throw new Exception(msg);
+  }
+} finally {
+  if (streamingLock.unlock()) {
+LOGGER.info("Table unlocked successfully after streaming 
finished" + carbonTable
+.getDatabaseName() + "." + carbonTable.getTableName());
+  } else {
+LOGGER.error("Unable to unlock Table lock for table " +
+carbonTable.getDatabaseName() + "." + 
carbonTable.getTableName() +
+" during streaming finished");
+  }
+}
+  } else {
+String msg = "Failed to acquire table status lock of " +
+carbonTable.getDatabaseName() + "." + 
carbonTable.getTableName();
+LOGGER.error(msg);
+throw new Exception(msg);
--- End diff --

fixed


---


[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...

2017-12-13 Thread QiangCai
Github user QiangCai commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1638#discussion_r156834271
  
--- Diff: 
streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
 ---
@@ -180,6 +182,70 @@ public static String close(CarbonTable table, String 
segmentId)
 }
   }
 
+  /**
+   * change the status of the segment from "streaming" to "streaming 
finish"
+   */
+  public static void finishStreaming(CarbonTable carbonTable) throws 
Exception {
+ICarbonLock lock = CarbonLockFactory.getCarbonLockObj(
+carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
+LockUsage.TABLE_STATUS_LOCK);
+try {
+  if (lock.lockWithRetries()) {
+ICarbonLock streamingLock = CarbonLockFactory.getCarbonLockObj(
+
carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
+LockUsage.STREAMING_LOCK);
--- End diff --

fixed


---


[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...

2017-12-13 Thread QiangCai
Github user QiangCai commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1638#discussion_r156832564
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 ---
@@ -129,6 +129,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser 
{
 CarbonAlterTableCompactionCommand(altertablemodel)
 }
 
+  protected lazy val alterTableFinishStreaming: Parser[LogicalPlan] =
--- End diff --

fixed


---


[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...

2017-12-13 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1638#discussion_r156599142
  
--- Diff: 
streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
 ---
@@ -180,6 +182,70 @@ public static String close(CarbonTable table, String 
segmentId)
 }
   }
 
+  /**
+   * change the status of the segment from "streaming" to "streaming 
finish"
+   */
+  public static void finishStreaming(CarbonTable carbonTable) throws 
Exception {
+ICarbonLock lock = CarbonLockFactory.getCarbonLockObj(
+carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
+LockUsage.TABLE_STATUS_LOCK);
+try {
+  if (lock.lockWithRetries()) {
+ICarbonLock streamingLock = CarbonLockFactory.getCarbonLockObj(
+
carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
+LockUsage.STREAMING_LOCK);
--- End diff --

This lock should be acquired first because it is acquired firstly in 
handoff flow


---


[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...

2017-12-13 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1638#discussion_r156597691
  
--- Diff: 
streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
 ---
@@ -180,6 +182,70 @@ public static String close(CarbonTable table, String 
segmentId)
 }
   }
 
+  /**
+   * change the status of the segment from "streaming" to "streaming 
finish"
+   */
+  public static void finishStreaming(CarbonTable carbonTable) throws 
Exception {
+ICarbonLock lock = CarbonLockFactory.getCarbonLockObj(
+carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
+LockUsage.TABLE_STATUS_LOCK);
+try {
+  if (lock.lockWithRetries()) {
+ICarbonLock streamingLock = CarbonLockFactory.getCarbonLockObj(
+
carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
+LockUsage.STREAMING_LOCK);
+try {
+  if (streamingLock.lockWithRetries()) {
+LoadMetadataDetails[] details =
+
SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath());
+boolean updated = false;
+for (LoadMetadataDetails detail : details) {
+  if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
+detail.setLoadEndTime(System.currentTimeMillis());
+detail.setSegmentStatus(SegmentStatus.STREAMING_FINISH);
+updated = true;
+  }
+}
+if (updated) {
+  CarbonTablePath tablePath =
+  
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
+  SegmentStatusManager.writeLoadDetailsIntoFile(
+  tablePath.getTableStatusFilePath(), details);
+}
+  } else {
+String msg = "Failed to finish streaming, because streaming is 
locked for table " +
+carbonTable.getDatabaseName() + "." + 
carbonTable.getTableName();
+LOGGER.error(msg);
+throw new Exception(msg);
+  }
+} finally {
+  if (streamingLock.unlock()) {
+LOGGER.info("Table unlocked successfully after streaming 
finished" + carbonTable
+.getDatabaseName() + "." + carbonTable.getTableName());
+  } else {
+LOGGER.error("Unable to unlock Table lock for table " +
+carbonTable.getDatabaseName() + "." + 
carbonTable.getTableName() +
+" during streaming finished");
+  }
+}
+  } else {
+String msg = "Failed to acquire table status lock of " +
+carbonTable.getDatabaseName() + "." + 
carbonTable.getTableName();
+LOGGER.error(msg);
+throw new Exception(msg);
--- End diff --

change to IOException


---


[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...

2017-12-13 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1638#discussion_r156597163
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
 ---
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.DataCommand
+
+import org.apache.carbondata.streaming.segment.StreamSegment
+
+/**
+ * This command will try to change the status of the segment from 
"streaming" to "streaming finish"
+ */
+case class CarbonAlterTableFinishStreaming(
+dbName: Option[String],
+tableName: String)
+  extends DataCommand {
--- End diff --

This should be metadata command


---


[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...

2017-12-13 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1638#discussion_r156596322
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 ---
@@ -129,6 +129,12 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser 
{
 CarbonAlterTableCompactionCommand(altertablemodel)
 }
 
+  protected lazy val alterTableFinishStreaming: Parser[LogicalPlan] =
--- End diff --

add syntax description comment


---


[GitHub] carbondata pull request #1638: [CARBONDATA-1879][Streaming] Support alter ta...

2017-12-13 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/1638#discussion_r156596190
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
 ---
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.DataCommand
+
+import org.apache.carbondata.streaming.segment.StreamSegment
+
+/**
+ * This command will try to change the status of the segment from 
"streaming" to "streaming finish"
+ */
+case class CarbonAlterTableFinishStreaming(
+dbName: Option[String],
+tableName: String)
+  extends DataCommand {
--- End diff --

This is not data command


---