How to Bulk Load Data from Text File to Big Data Hadoop HBase Table?

Here we are introducing the process of bulk loading of data from text file using HBase java client API. The worldwide Hadoop development community will learn in this post about bulk loading and when to use it and how its process is looks like.

We are introducing bulk loading of data using HBase bulk load feature using HBase java client API.

Bulk Loading:

HBase gives us random, real-time, read/write access to Big Data, generally we try to load data to HBase table via the client APIs or by using a MapReduce job with TableOutputFormat, but those approaches are problematic, Instead, the HBase bulk loading feature is much easier to use and can insert the same amount of data more quickly.

When to use Bulk Loading:

If you have any of these symptoms, bulk loading is probably a good choice for you:

  • You needed to tweak MemStores to use most of the memory.
  • You needed to either use bigger WALs or bypass them entirely.
  • Your compaction and flush queues are in the hundreds.
  • Your GC is out of control because your inserts range in the MBs.
  • Your latency goes out of your SLA when you import data.
In general speak bulk loading is the process of preparing and loading HFiles directly into the RegionServers, thus bypassing write path and obviating issues related to them.

The bulk loading process looks like:

  1. Extract data from source(in our case from Text File).
  2. Transform data into HFiles.
  3. Loading the files into HBase by telling RegionServers where to find them.

So, after simple overview of bulk loading let us see a simple example by loading a comma separated text file into HBase using Bulkloading feature, To do this please follow below steps.

Environment:

  • Java : 1.7.0_75
  • Hadoop : 1.0.4
  • HBase : 0.94.6

Prerequests:

  1. Data file (here we have comma separated user.txt as sample data file).
  2. HBase table must be present and the column family names must be same in HBase table and our bulk loading code.
  3. We have an HBase table “user” with 2 column families “personalDetails” and “contactDetails

Our sample data file user.txt looks like:

bulk-loading-text-file-big-data-hadoop-hbase-table-sample-data

It is stored as below format:

RowKey, First_Name, Last_Name, Email, City

For bulk loading our code looks like:

HbaseBulkLoadDriver.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class HBaseBulkLoadDriver extends Configured implements Tool {	
    private static final String DATA_SEPERATOR = ",";	
    private static final String TABLE_NAME = "user";	
    private static final String COLUMN_FAMILY_1="personalDetails";	
    private static final String COLUMN_FAMILY_2="contactDetails";	
    /**
     * HBase bulk import example
     * Data preparation MapReduce job driver
     * 
     * args[0]: HDFS input path
     * args[1]: HDFS output path
     * 
     */
    public static void main(String[] args) {		
        try {
            int response = ToolRunner.run(HBaseConfiguration.create(), new HBaseBulkLoadDriver(), args);			
            if(response == 0) {				
                System.out.println("Job is successfully completed...");
            } else {
                System.out.println("Job failed...");
            }
        } catch(Exception exception) {
            exception.printStackTrace();
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        int result=0;
        String outputPath = args[1];		
        Configuration configuration = getConf();		
        configuration.set("data.seperator", DATA_SEPERATOR);		
        configuration.set("hbase.table.name",TABLE_NAME);		
        configuration.set("COLUMN_FAMILY_1",COLUMN_FAMILY_1);		
        configuration.set("COLUMN_FAMILY_2",COLUMN_FAMILY_2);		
        Job job = new Job(configuration);		
        job.setJarByClass(HBaseBulkLoadDriver.class);		
        job.setJobName("Bulk Loading HBase Table::"+TABLE_NAME);		
        job.setInputFormatClass(TextInputFormat.class);		
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);		
        job.setMapperClass(HBaseBulkLoadMapper.class);		
        FileInputFormat.addInputPaths(job, args[0]);		
        FileSystem.getLocal(getConf()).delete(new Path(outputPath), true);		
        FileOutputFormat.setOutputPath(job, new Path(outputPath));		
        job.setMapOutputValueClass(Put.class);		
        HFileOutputFormat.configureIncrementalLoad(job, new HTable(configuration,TABLE_NAME));		
        job.waitForCompletion(true);		
        if (job.isSuccessful()) {
            HBaseBulkLoad.doBulkLoad(outputPath, TABLE_NAME);
        } else {
            result = -1;
        }
        return result;
    }
}

HbaseBulkLoadMapper.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class HBaseBulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    private String hbaseTable;	
    private String dataSeperator;
    private String columnFamily1;
    private String columnFamily2;
    private ImmutableBytesWritable hbaseTableName;

    public void setup(Context context) {
        Configuration configuration = context.getConfiguration();		
        hbaseTable = configuration.get("hbase.table.name");		
        dataSeperator = configuration.get("data.seperator");		
        columnFamily1 = configuration.get("COLUMN_FAMILY_1");		
        columnFamily2 = configuration.get("COLUMN_FAMILY_2");		
        hbaseTableName = new ImmutableBytesWritable(Bytes.toBytes(hbaseTable));		
    }

    public void map(LongWritable key, Text value, Context context) {
        try {		
            String[] values = value.toString().split(dataSeperator);			
            String rowKey = values[0];			
            Put put = new Put(Bytes.toBytes(rowKey));			
            put.add(Bytes.toBytes(columnFamily1), Bytes.toBytes("first_name"), Bytes.toBytes(values[1]));			
            put.add(Bytes.toBytes(columnFamily1), Bytes.toBytes("last_name"), Bytes.toBytes(values[2]));			
            put.add(Bytes.toBytes(columnFamily2), Bytes.toBytes("email"), Bytes.toBytes(values[3]));			
            put.add(Bytes.toBytes(columnFamily2), Bytes.toBytes("city"), Bytes.toBytes(values[4]));			
            context.write(hbaseTableName, put);			
        } catch(Exception exception) {			
            exception.printStackTrace();			
        }
    }
}

HbaseBulkLoad.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;

public class HBaseBulkLoad {	
    /**
     * doBulkLoad.
     *
     * @param pathToHFile path to hfile
     * @param tableName 
     */
    public static void doBulkLoad(String pathToHFile, String tableName) {
        try {		
            Configuration configuration = new Configuration();			
            configuration.set("mapreduce.child.java.opts", "-Xmx1g");	
            HBaseConfiguration.addHbaseResources(configuration);	
            LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(configuration);	
            HTable hTable = new HTable(configuration, tableName);	
            loadFfiles.doBulkLoad(new Path(pathToHFile), hTable);	
            System.out.println("Bulk Load Completed..");		
        } catch(Exception exception) {			
            exception.printStackTrace();			
        }		
    }	
}

To run the code follow below steps:

  1. Include all libraries from lib/
  2. Export project’s JAR to <YourProjectName>.jar
  3. Copy <YourInputFile>.txt to hdfs
  4. Create table using: create ‘<tablename>’, <ColumnFamily1>,<ColumnFamily2>
  5. Run the JAR file using: hadoop jar <YourProjectName>.jar <hdfsDirectory>/YourInputFile .txt <hdfsDirectory>/<outputname>/
  6. Modify MapReduce job for your needs

After running the code you can see the result using ‘scan‘ comand in HBase shell. Our output looks like:

bulk-loading-text-file-big-data-hadoop-hbase-table-hbase-shell-output-2-large
Click to Enlarge

You can also find generated files for column families in hdfs. Our output folder in hdfs looks like:

bulk-loading-text-file-big-data-hadoop-hbase-table-hbase-shell-output

Code Walk Through:

  • Most of the code is self-explanatory, so you can easily check and get line by line understanding of the code.
  • We are extracting data from text file and putting the data in HBase Put in our HbaseBulkLoadMapper.java
  • configureIncrementalLoad() method will generate HFiles.
  • HbaseBulkLoad.java will load those generated HFiles into HBase table.
We hope this programs will help you to understand Bulk loading.

We hope the post has made you understand about the concept of bulk loading. This post was intended for helping big data and Hadoop development community in understanding bulk loading and its process.

This article is written by Samual Alister. He is an experienced Big Data Hadoop Architecture Developer working with Aegis Soft Tech. He is also deep experienced in a Hadoop architecture.

Disclosure: Some of our articles may contain affiliate links; this means each time you make a purchase, we get a small commission. However, the input we produce is reliable; we always handpick and review all information before publishing it on our website. We can ensure you will always get genuine as well as valuable knowledge and resources.

Recommended Content for you:

Related Articles You May Like:

Article Published by Souvik Banerjee

Web Developer & SEO Specialist with 10+ years of experience in Open Source Web Development, specialized in Joomla & WordPress development. He is also the moderator of this blog “RS Web Solutions”.

Comments are closed.

18 thoughts on “How to Bulk Load Data from Text File to Big Data Hadoop HBase Table?

  1. Hi Souvik. Since i am just starting to use HBase and Hadoop, i try to use yours as an inspiration. When i try it, it shows this:

    Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration
    at bulkload.example.HBaseBulkLoadDriver.main(HBaseBulkLoadDriver.java:46)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
    Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.HBaseConfiguration
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    … 7 more

    Can you please tell me what is wrong? thanks

  2. Hi ,
    I am getting this error while running this code please help

    Error: java.io.IOException: Initialization of all the collectors failed. Error in last collector was

    • The error you can get for various reasons. Please check the task log and it will show you the reason why you are getting the error.

      • Where to find the task log can you please help me with the location. I found the job in UI but there it is saying log server is not configured .

          • Thanks a lot .
            i found this error in the log

            2016-08-08 23:39:34,865 INFO [main] org.apache.hadoop.mapred.Task: Using ResourceCalculatorProcessTree : [ ]
            2016-08-08 23:39:35,323 INFO [main] org.apache.hadoop.mapred.MapTask: Processing split: hdfs://quickstart.cloudera:8020/user/cloudera/su.txt:0+50
            2016-08-08 23:39:35,515 INFO [main] org.apache.hadoop.mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
            2016-08-08 23:39:35,519 INFO [main] org.apache.hadoop.mapred.MapTask: mapreduce.task.io.sort.mb: 100
            2016-08-08 23:39:35,519 INFO [main] org.apache.hadoop.mapred.MapTask: soft limit at 83886080
            2016-08-08 23:39:35,519 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufvoid = 104857600
            2016-08-08 23:39:35,519 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 26214396; length = 6553600
            2016-08-08 23:39:35,544 WARN [main] org.apache.hadoop.mapred.MapTask: Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBuffer
            java.lang.NullPointerException
            at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1012)
            at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402)
            at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
            at org.apache.hadoop.mapred.MapTask$NewOutputCollector.(MapTask.java:698)
            at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
            at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
            at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.Subject.doAs(Subject.java:415)
            at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
            at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
            2016-08-08 23:39:35,548 WARN [main] org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:SIMPLE) cause:java.io.IOException: Initialization of all the collectors failed. Error in last collector was :null
            2016-08-08 23:39:35,549 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: Initialization of all the collectors failed. Error in last collector was :null
            at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:414)
            at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
            at org.apache.hadoop.mapred.MapTask$NewOutputCollector.(MapTask.java:698)
            at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
            at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
            at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.Subject.doAs(Subject.java:415)
            at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
            at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
            Caused by: java.lang.NullPointerException
            at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1012)
            at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402)
            … 9 more

  3. Hi,

    An error got resolved, but after running MR program , MR output folder contains _SUCCESS, it did not generate the output files in my case “personal data” and “professional data”. Because of that HFiles did not create and thereafter HFiles should not move to Hbase table.

    Please help me.

  4. Awesome. I had some trouble in running with heap memory issue. I added the following to resolve.

    Configuration configuration = getConf();
    configuration.set(“mapred.child.java.opts”, “-Xmx300m”);

    Thanks a lot

  5. I am getting below error

    6/07/22 10:08:02 ERROR zookeeper.ZooKeeperNodeTracker: Check the value configured in ‘zookeeper.znode.parent’. There could be a mismat
    ch with the one configured in the master.
    java.lang.IllegalArgumentException: Check the value configured in ‘zookeeper.znode.parent’. There could be a mismatch with the one conf
    igured in the master.
    at org.apache.hadoop.hbase.zookeeper.RootRegionTracker.waitRootRegionLocation(RootRegionTracker.java:81)
    at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:849)
    at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:962)
    at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:860)
    at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegionInMeta(HConnectionManager.java:962)
    at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:864)
    at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.locateRegion(HConnectionManager.java:821)
    at org.apache.hadoop.hbase.client.HTable.finishSetup(HTable.java:234)
    at org.apache.hadoop.hbase.client.HTable.(HTable.java:174)
    at org.apache.hadoop.hbase.client.HTable.(HTable.java:133)
    at com.HBase.BulkLoad.HbaseBulkLoadDriver.run(HbaseBulkLoadDriver.java:71)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
    at com.HBase.BulkLoad.HbaseBulkLoadDriver.main(HbaseBulkLoadDriver.java:34)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

  6. Souvik , for me HFiles are getting created but i am not able to load them to HBase table. I dont see any error in the job but the data too is not getting loaded to HBase. Any solution will help me.

  7. There’s an common MapReduce error in the mapper code.

    you should’nt emit the table name as key in context.write(hbaseTableName, put);
    but instead :
    context.write(new ImmutableBytesWritable(rowKey) , put);

    with this code, all data will be sent to the same reducer and the same reduce methode invocation => Java Heap space

  8. Good explanation Souvik. Tried same code, it worked well for me. Thanks for sharing it.
    One thing, if anyone is referring this. I had different user for hbase. So had to give 777 permission on output folder while program was running.