Move data from Hbase to MySQL

1. Introduction

In this article I will explain about moving data from HBase to MySQL. HBase an unstructured database and MySQL, a structured database. With very little changes, it can be used to move data from MySQL to HBase

HBase is a Hadoop database, that stores data as key value pair. HBase can store billions of rows X millions of columns. It is built based on Google BigTable model. Hbase has its own API to retrieve data and write data. For more details on Hbase have a look at http://hbase.apache.org/.

MySQL is a most popular open source database. It is a RDBMS similar to Oracle or Microsoft SQL Server. MySQL provides a JDBC access to data. You can get more details http://www.mysql.com/.

Because the interface to access the two databases are different, it makes the process of moving data from Hbase to MySQL a little bit complex. I have designed everything in such a way that, the application does not identify if it is JDBC based access or custom API. That makes it easy to add more and more types of databases without breaking applications. SO, moving from Hbase to MySQL or MySQL to HBase does not too much of code change.

Note: This is an old article. Copied manually when moving to new hosting provider.

Assumptions


1. In HBase, the first column is always used as Id for row and it is of type integer.
2. All sources maintain the order of columns. That is, first column of HBase record will always set as first column of MySQL record.

2. Common interfaces and classes

We have some common APIs which can be implemented by various database types to provide the required service. All these APIs are throwing IOException. So, any custom exception two by the database APIs has be to wrapped with IOException.

  • DataContainer : Encapsulates a source of data. For example a table.
    public interface DataContainer {
        public void connect() throws IOException;
        public void close() throws IOException;
    }
  • DataReader : Any source which supports reading data should implement this interface.
    public interface DataReader extends  DataContainer {
    
        public DataRecord readRecord() throws IOException;;
    
    }
  • DataWriter : Any source which supports writing data should implement this interface.
    public interface DataWriter extends DataContainer {
        
        public void writeRecord(DataRecord rec) throws IOException;
        
    }
    
  • DataRecord : A class to store a record. In case of table based sources, this represents a row. Values are stored in a simple array,
    public class DataRecord {
        
        private Object[] _mData = null;
        
        public DataRecord(Object[] data) {
            _mData = data;
        }
        
        public Object getDataAt(int index) {
            return _mData[index];
        }
        
    }
    

3. MySQL implementation

We will write a class that implements DataReader and DataWriter.

public class MySQLSource implements DataWriter, DataReader {
}

Using JDBC calls we can connect to a MySQL server. Our goal is create a class that can handle any table. So, we create a constructor that takes the table name and the column names. DataRecord used by read and write APIs will have column values in the same order as the column names passed to the constructor.


private String _mTableName = null;
private String _mCols[] = null;

public MySQLSource(String table, String cols[]) {
     _mTableName = table;
     _mCols = cols;
}

Next step is to create a connection to MySQL server. The implementation for connect will take care of this.


private Connection _mConnection = null;

@Override
public void connect() throws IOException {
    try {
        try {
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException ex) {
            throw new IOException("Error loading driver");
        }

        _mConnection = DriverManager.getConnection("jdbc:mysql://localhost:3306/sample", "sample", "sample");
        _mConnection.setAutoCommit(true);
    } catch (SQLException ex) {
        throw new IOException(ex);
    }

}

Once we have the connection, lets write API to read records from MySQL source. Create a SQL string, using the column names passed and a PreparedStatement instance. With this statement, we can execute the query and save the ResultSet object. So, next call to readRecord does not need to execute the query again. Data from the result is stored in array and we create an instance of DataRecord.


private ResultSet _mResultSet = null;

@Override
public DataRecord readRecord() throws IOException {
    if (_mResultSet == null) {
        //Create SQL statemnt, execute the query and store the resultset instance.
        StringBuilder strBuff = new StringBuilder("SELECT ");
        for (int i = 0; i < _mCols.length - 1; i++) {
            strBuff.append(_mCols[i]).append(",");
        }
         strBuff.append(_mCols[_mCols.length - 1]).append(" FROM ").append(_mTableName);
        try {
           _mResultSet = _mConnection.createStatement().executeQuery(strBuff.toString());
        } catch (SQLException ex) {
            throw new IOException(ex);
        }
    }
        
    //Fetch the record.
    try {
        if (!_mResultSet.next()) {
            return null;
        }
            
        Object data[] = new Object[_mCols.length];
        for(int i=0;i<_mCols.length;i++) {
            data[i] = _mResultSet.getObject(_mCols[i]);
        }
           
        return new DataRecord(data);
        
    } catch (SQLException exp) {
        throw new IOException(exp);
    }

}

The last API we implement for MySQL source is write API. We create SQL string, the way we created for read API. We prepare the statement for repeated insertion. So every call to the write API will set the values for bind variables and insert the record.

@Override
public void writeRecord(DataRecord rec) throws IOException {
    if (_mInsertStatement == null) {
        try {
            StringBuilder strBuff = new StringBuilder("INSERT INTO PERSON(");
            StringBuilder strBuff2 = new StringBuilder(" VALUES(");
            for (int i = 0; i < _mCols.length - 1; i++) {
                strBuff.append(_mCols[i]).append(",");
                strBuff2.append("?").append(",");
            }
            strBuff.append(_mCols[_mCols.length - 1]).append(") ");
            strBuff2.append("?)");
            strBuff.append(strBuff2.toString());

            _mInsertStatement = _mConnection.prepareStatement(strBuff.toString());
        } catch (SQLException ex) {
            throw new IOException(ex);
        }
    }

    try {
        for(int i=0;i<_mCols.length;i++) {
            _mInsertStatement.setObject(i+1, rec.getDataAt(i));
        }
            
        _mInsertStatement.executeUpdate();
    } catch (SQLException exp) {
        throw new IOException(exp);
    }
}

And finally , close the connection

@Override
public void close() throws IOException {
    try {
        _mConnection.close();
    } catch (SQLException ex) {
        throw new IOException(ex);
    }
}

4. HBase implementation

HBase uses custom APIs to connect HBase database. The sample provided here assumes that you have a local HBase setup and all the values are default, including the port.

Our class HBaseSource implements DataReader and DataWriter, same as MySQL source we have created above.

public class HBaseSource implements DataReader, DataWriter {
}

Most of the APIs use parameter as byte array. So we need to add a conversion mechanism to store the appropriate type of data in the DataRecord. HBase provides a convenient class Bytes, which helps us to convert byte array to appropriate type. But for this, the user has to know the type of value stored in the byte array. So, the constructor needs to take an additional parameter for type. The array represents type of each column corresponding to the column names.


private String _mTableName = null;
private String[] _mHBaseCols = null;
private Class[] _mTypes = null;

public HBaseSource(String table, String hbasecols[], Class types[]) {
    _mTableName = table;
    _mHBaseCols = hbasecols;
    _mTypes = types;
}

connect() implementaion is very simple. We will just get an instance HBAseConfiguration with all default values

@Override
public void connect() throws IOException {
     _mConfig = HBaseConfiguration.create();
}

HBase also has table concepts. Values are stored as key value par. Where key can be a column name or a column family and qualifier, in the format of family:qualifier. To start with, we need a table instance. This is required for both read and write operations. But they will be created only in appropriate APIs and only when required.

private HTable _mTable = null;

readRecord() API will create an instance Scanner for the table. Scanner class will help us to the iterate through records in table and fetch the value. Scanner is similar to ResultSet. We need to initialize it only once. The method initializeReader(), shown below will be called from the readRecord API only once.


private void initalizeReader() throws IOException {
    _mTable = new HTable(_mConfig, _mTableName);
    Scan scan = new Scan();

    //We scan for all columns
    for (String col : _mHBaseCols) {
        int index = col.indexOf(":");

        //Check if we have just column name of family name also.
        if (index > 0) {
            scan.addColumn(Bytes.toBytes(col.substring(0, index)), Bytes.toBytes(col.substring(index + 1)));
        } else {
            scan.addFamily(Bytes.toBytes(col));
        }
    }

    //Get instance for scanner.
    _mScanner = _mTable.getScanner(scan);
}

We read records from the Scanner instance. Every record in scanner is represented by object of type Result. Every row in HBase will have an ID. This value can be obtained by using Result.getRow(), which returns a byte array. We use the conversion API provided by HBase to convert. In this sample, we will handle only Integer and String types. The id will always be the first column. So, we need to fetch from second column onward.


@Override
public DataRecord readRecord() throws IOException {
    if (_mTable == null) {
        initalizeReader();
    }

    Result res = _mScanner.next();
    if (res == null) {
        return null;
    }

    Object values[] = new Object[_mHBaseCols.length];

    for (int i = 0; i < _mTypes.length; i++) {

        String col = _mHBaseCols[i];
        int index = _mHBaseCols[i].indexOf(":");
        Object value = null;
        byte bytes[] = null;

        if (i == 0) {
            //First column is the ID
            bytes = res.getRow();
        } else {
            if (index > 0) {
                bytes = res.getValue(Bytes.toBytes(col.substring(0, index)), Bytes.toBytes(col.substring(index + 1)));
            } else {
                bytes = res.getValue(Bytes.toBytes(col.substring(0, index)), null);
            }
        }
        if (_mTypes[i] == Integer.class) {
            value = Bytes.toInt(bytes);
        } else if (_mTypes[i] == String.class) {
            value = Bytes.toString(bytes);
        }

        values[i] = value;
            
    }
        
    DataRecord rec = new DataRecord(values);
    return rec;
}

For writing the record, we just need instance of the table. So, here is the initalizeWriter() method implementaion

private void initalizeWriter() throws IOException {
    _mTable = new HTable(_mConfig, _mTableName);
}

To write to HBase table, we need an instance of Put. Put is kind of record for inserting, while Result is for reading record. As will reading, we need to convert data to byte array for writing.

@Override
public void writeRecord(DataRecord rec) throws IOException {
     if (_mTable == null) {
        initalizeWriter();
     }

    //We will use the first item as the key
    Put p = new Put(Bytes.toBytes((int) rec.getDataAt(0)));
    for (int i = 1; i < _mHBaseCols.length; i++) {
        String col = _mHBaseCols[i];
        int index = col.indexOf(":");
        byte data[] = null;
        if (_mTypes[i] == Integer.class) {
            data = Bytes.toBytes((int) rec.getDataAt(i));
        } else if (_mTypes[i] == String.class) {
            data = Bytes.toBytes((String) rec.getDataAt(i));
        }
        if (index > 0) {
            p.add(Bytes.toBytes(col.substring(0, index)), Bytes.toBytes(col.substring(index + 1)), data);
         } else {
            p.add(Bytes.toBytes(col), null, data);
         }

        _mTable.put(p);
    }
}

5. MySQL to HBase

We will first move the data from MySQL to HBase and then from HBase to MySQL. To use it, first we need to create the required tables on MySQL and HBase.

  • MySQL : Connect to MySQL and run the following SQL to create table and few records.
    
    create table PERSON(PERS_ID INT PRIMARY KEY, PERS_DESC VARCHAR(100));
    insert into PERSON(PERS_ID, PERS_DESC) VALUES(1,'First person');
    insert into PERSON(PERS_ID, PERS_DESC) VALUES(2,'Second person');
    commit;
    
    
  • HBase : Open a terminal and go to $HBASE_HOME/bin folder and run hbase shell using the command
    $./hbase shell
    

    We will create a sample table on HBase.

    create 'sample', 'person','id', 'desc'
    
    

Create a class that we execute to test the application.

public class Main {
    
    public static void main(String[] args) {
        
        //HBAse column types
        Class types[] = {Integer.class, String.class};
        //HBase columns
        String hbaseCols[] =  {"person:id","person:desc"};
        //MySQL columns
        String mySQLCols[] =  {"PERS_ID","PERS_DESC"};
        
        HBaseSource hb = new HBaseSource("sample",hbaseCols, types);
        MySQLSource my = new MySQLSource("PERSON",mySQLCols);
        
        try {
            hb.connect();
            my.connect();
            DataRecord rec = my.readRecord();
            while(rec != null) {
                hb.writeRecord(rec);
                rec = my.readRecord();
            }
            
            
            
        } catch (IOException ex) {
            ex.printStackTrace();
            return;
        }
    }
}

Execute this main class. Verify if we have the two new records in HBase. Run the following command from HBase shell.

scan 'sample'

Output should be,

ROW                            COLUMN+CELL                                                                           
 \x00\x00\x00\x01              column=person:desc, timestamp=1382725722596, value=First person                       
 \x00\x00\x00\x02              column=person:desc, timestamp=1382725722609, value=Second person                      
2 row(s) in 0.1220 seconds

Now let's move the records from HBase to MySQL. But before that we need to remove existing records from MySQL. Run the following command from MySQL prompt connected to our schma.

delete from PERSON;

5. HBase to MySQL

Now swap usage of MySQL source and HBase source. So, easy to move from HBase to MySQL. The code will be


DataRecord rec = hb.readRecord();
while(rec != null) {
    my.writeRecord(rec);
    rec = hb.readRecord();
}

Execute the Main class. Check the results from MySQL prompt.

mysql> select * from PERSON;
+---------+---------------+
| PERS_ID | PERS_DESC     |
+---------+---------------+
|       1 | First person  |
|       2 | Second person |
+---------+---------------+
2 rows in set (0.00 sec)

Source code is available here