Transaction Support in Cloudera Operational Database (COD)

What is CDP Operational Database (COD)

CDP Operational Database enables developers to quickly build future-proof applications that are architected to handle data evolution. It helps developers automate and simplify database management with capabilities like auto-scale, and is fully integrated with Cloudera Data Platform (CDP). For more information and to get started with COD, refer to Getting Started with Cloudera Data Platform Operational Database (COD).

Background

We have divided the “Transaction Support in Cloudera Operational Database (COD)” blog into two parts.

  • In this first post, we are covering the overview and usages of transaction support in COD.
  • In the second, we are demonstrating how to use transactions in your COD environment with a step-by-step example. See how to use transactions in COD.

An overview of transaction support in COD

Transactions are a sequence of one or more changes in a database that must be completed in an order, or canceled to ensure integrity and consistency.

The transaction support in COD enables you to perform complex distributed transactions and run atomic cross-row and cross-table database operations. The atomic database operations ensure that your database operations must either be completed or terminated.

COD supports Apache OMID (Optimistically Transaction Management In Datastores) transactional framework that allows big data applications to execute ACID transactionsadhering to the ACID properties of atomicity, consistency, isolation and durabilityon COD tables. OMID provides lock-free transactional support on top of HBase with snapshot isolation guarantee. OMID enables big data applications to benefit from the best of both worlds: the scalability provided by NoSQL datastores such as HBase, and the concurrency and atomicity provided by transaction processing systems.

How COD manages transactions

When multiple transactions are happening simultaneously at different terminals, COD ensures either the HBase tables are updated for each transaction end to end, marking the transaction as completed, or terminates the transaction and HBase tables are not updated. COD archives this transaction management using OMID, which is a transactional processing service, along with HBase and Phoenix. 

COD also manages relevant configurations required to support transactions so that you can use transactions in applications without any additional effort.

Figure1: OMID client view

COD automatically performs all the steps to manage Phoenix transactions. These steps are described in Annexure1.

How to use transactions with different applications

You can use COD transactions in streaming applications or OLTP (Online Transaction Processing) applications as well as batch-oriented Spark applications.

For more details on deploying transaction support on COD, see How to Use Transactions on COD

The following are the different ways and scenarios through which you can use COD transactions. 

1 (a): Phoenix thick client and thin client (using SQLLine command line) : 

// create transactional table

0: jdbc:phoenix:> create table bankaccount(customer_id varchar primary key,name varchar, balance double) transactional=true;

No rows affected (2.287 seconds)

// Initial data population

0: jdbc:phoenix:> upsert into bankaccount values('CU001', 'foo', 100.0);

1 row affected (0.017 seconds)

0: jdbc:phoenix:> upsert into bankaccount values('CU002', ' baa', 100.0);

1 row affected (0.015 seconds)

0: jdbc:phoenix:> select * from bankaccount;

+-------------+------+---------+

| CUSTOMER_ID | NAME | BALANCE |

+-------------+------+---------+

| CU001       | foo  | 100.0   |

| CU002       |  baa | 100.0   |

+-------------+------+---------+

// Auto commit off

0: jdbc:phoenix:> !autocommit off

Autocommit status: false

// Starts transaction 1 to transfer 50 from CU001 to CU002

0: jdbc:phoenix:> upsert into bankaccount(customer_id, balance) select customer_id, balance - 50 from bankaccount where customer_id = 'CU001';

1 row affected (0.075 seconds)

0: jdbc:phoenix:> upsert into bankaccount(customer_id, balance) select customer_id, balance + 50 from bankaccount where customer_id = 'CU002';

1 row affected (0.021 seconds)

0: jdbc:phoenix:> !commit

Commit complete (0.044 seconds)

0: jdbc:phoenix:> select * from bankaccount;

+-------------+------+---------+

| CUSTOMER_ID | NAME | BALANCE |

+-------------+------+---------+

| CU001       | foo  | 50.0    |

| CU002       |  baa | 150.0   |

+-------------+------+---------+

2 rows selected (0.068 seconds)

// Starts transaction 2  to transfer 20 from CU001 to CU002

0: jdbc:phoenix:> upsert into bankaccount(customer_id, balance) select customer_id, balance - 20 from bankaccount where customer_id = 'CU001';

1 row affected (0.014 seconds)

0: jdbc:phoenix:> upsert into bankaccount(customer_id, balance) select customer_id, balance + 20 from bankaccount where customer_id = 'CU002';

1 row affected (0.349 seconds)

// Rollback the changes

0: jdbc:phoenix:> !rollback 

Rollback complete (0.007 seconds)

// Should get the same result as above after rollback.

0: jdbc:phoenix:> select * from bankaccount;

+-------------+------+---------+

| CUSTOMER_ID | NAME | BALANCE |

+-------------+------+---------+

| CU001       | foo  | 50.0    |

| CU002       |  baa | 150.0   |

+-------------+------+---------+

2 rows selected (0.038 seconds)


1 (b): Phoenix thick and thin client (using Java application):

try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
 Statement stmt = conn.createStatement();

 stmt.execute("CREATE TABLE IF NOT EXISTS ITEM " +

     " (id varchar not null primary key, name varchar, quantity integer) transactional=true");

 conn.setAutoCommit(false);

 stmt = conn.createStatement();

 stmt.execute("UPSERT INTO ITEM VALUES('ITM001','Book', 5)");

 stmt.execute("UPSERT INTO ITEM VALUES('ITM002','Pen', 5)");

 conn.commit();

 stmt.execute("UPSERT INTO ITEM VALUES('ITM003','Soap', 5)");

 conn.rollback();

 ResultSet rs = stmt.executeQuery("SELECT count(*) FROM ITEM");

 // The number of rows should be two.

 System.out.println("Number of rows " + rs.next());

}

With exception handling

try (Connection conn = DriverManager.getConnection(jdbcUrl)) {

   try {

       Statement stmt = conn.createStatement();

       stmt.execute("CREATE TABLE IF NOT EXISTS ITEM " +

               " (id varchar not null primary key, name varchar, quantity integer) transactional=true");

       conn.setAutoCommit(false);

       stmt = conn.createStatement();

       stmt.execute("UPSERT INTO ITEM VALUES('ITM001','Book', 5)");

       stmt.execute("UPSERT INTO ITEM VALUES('ITM002','Pen', 5)");

       conn.commit();

   } catch (SQLException e) {

       LOG.error("Error occurred while performing transaction:", e);

       conn.rollback();

       // handling the exception object

       throw new RuntimeException(e);

   }

}

2: Phoenix spark application: you can use Phoenix-Spark connector transactions to retry the Spark tasks if there are any conflicts with other jobs or streaming applications.

COD supports the following two types of transactions while writing into the tables. 

  1. Batch wise transactions: Set phoenix.upsert.batch.size to any positive integer value to create transactions for a batch of a particular number of rows.
  2. Partition wise transactions: Set phoenix.upsert.batch.size to 0 to create one transaction per task.

Git link for the example code: https://github.com/cloudera/cod-examples/tree/main/phoenix-spark-transactions

val tableName: String = "SPARK_TEST"

val conn = DriverManager.getConnection(url)

var stmt = conn.createStatement();

stmt.execute("CREATE TABLE SPARK_TEST

 " (ID INTEGER PRIMARY KEY, COL1 VARCHAR, COL2 INTEGER) TRANSACTIONAL=true" +

 " SPLIT ON (200, 400, 600, 800, 1000)")

val spark = SparkSession

 .builder()

 .appName("phoenix-test")

 .master("local")

 .getOrCreate()

val schema = StructType(

 Seq(StructField("ID", IntegerType, nullable = false),

   StructField("COL1", StringType),

   StructField("COL2", IntegerType)))

// Write rows from 1 to 500.

var dataSet = List(Row(1, "1", 1), Row(2, "2", 2))

for (w <- 3 to 500) {

 dataSet = dataSet :+ Row(w, "foo", w);

}

var rowRDD = spark.sparkContext.parallelize(dataSet)

var df = spark.sqlContext.createDataFrame(rowRDD, schema)

// Batch wise transactions:

// ========================

// Setting batch size to 100. For each batch of 100 records one transaction gets created.

var extraOptions = "phoenix.transactions.enabled=true,phoenix.upsert.batch.size=100";

df.write

 .format("phoenix")

 .options(Map("table" -> tableName, PhoenixDataSource.ZOOKEEPER_URL -> zkUrl,

   PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions))

 .mode(SaveMode.Overwrite)

 .save()

// Write rows from 500 to 1000.

dataSet = List(Row(501, "500", 500), Row(502, "502", 502))

for (w <- 503 to 1000) {

 dataSet = dataSet :+ Row(w, ""+w, w);

}

// Partition wise transactions:

// ===========================

// Setting batch size 0 means for partition one transaction gets created.

rowRDD = spark.sparkContext.parallelize(dataSet)

df = spark.sqlContext.createDataFrame(rowRDD, schema)

extraOptions = "phoenix.transactions.enabled=true,phoenix.upsert.batch.size=0";

df.write

 .format("phoenix")

 .options(Map("table" -> tableName, PhoenixDataSource.ZOOKEEPER_URL -> zkUrl,

   PhoenixDataSource.PHOENIX_CONFIGS -> extraOptions))

 .mode(SaveMode.Overwrite)

 .save()


How to use transactions with different tools 

The main operations that you use while accessing COD transactions are auto commit on/off, commit, and rollback. These operations are performed in different ways with different tools.

In this section, you can find a link to a popular SQL development tool like DbVisualizer and an example snippet.

DbVisualizer:

https://confluence.dbvis.com/display/UG100/Auto+Commit%2C+Commit+and+Rollback

Summary

In this blog post, we’ve discussed how COD manages transactions happening at multiple terminals using OMID. We have also included various scenarios where you include COD transactions and an end-to-end flow describing how you can implement transactions in a real time scenario.

So, are you ready to try out COD transactions support? Here’s your first step on creating a database using COD.

Annexure

Annexure1:

Step 1: The following property in HBase UI > Configurations tab is set as ‘true’.

phoenix.transactions.enabled=true

Step 2: COD generates OMID client configuration file, hbase-omid-client-config.yml, that contains the transaction server address.

You can use the following command to download the client configuration file and use the configuration in application classpath along with hbase-site.xml.

Like this post? Please share to your friends:
Leave a Reply

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: