Spark连接HBase

2018/3/7 9:55:02 人评论 次浏览 分类:大数据

(一)、Spark读取HBase中的数据

hbase中的数据

 

 1 import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
 2 import org.apache.hadoop.hbase.client.HBaseAdmin
 3 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
 4 import org.apache.spark._
 5 import org.apache.hadoop.hbase.util.Bytes
 6 
 7 /**
 8   * Created by *** on 2018/2/12.
 9   *
10   * 从hbase读取数据转化成RDD
11   */
12 object SparkReadHBase {
13 
14   def main(args: Array[String]): Unit = {
15     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
16     val sc = new SparkContext(sparkConf)
17 
18     val tablename = "account"
19     val conf = HBaseConfiguration.create()
20     //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
21     conf.set("hbase.zookeeper.quorum","node02,node03,node04")
22     //设置zookeeper连接端口,默认2181
23     conf.set("hbase.zookeeper.property.clientPort", "2181")
24     conf.set(TableInputFormat.INPUT_TABLE, tablename)
25 
26     // 如果表不存在则创建表
27     val admin = new HBaseAdmin(conf)
28     if (!admin.isTableAvailable(tablename)) {
29       val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
30       admin.createTable(tableDesc)
31     }
32 
33     //读取数据并转化成rdd
34     val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
35       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
36       classOf[org.apache.hadoop.hbase.client.Result])
37 
38     val count = hBaseRDD.count()
39     println(count)
40     hBaseRDD.foreach{case (_,result) =>{
41       //获取行键
42       val key = Bytes.toString(result.getRow)
43       //通过列族和列名获取列
44       val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
45       val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
46       println("Row key:"+key+" Name:"+name+" Age:"+age)
47     }}
48 
49     sc.stop()
50     admin.close()
51   }
52 
53 }

(二)、Spark写HBase

  1.第一种方式:

 1 import org.apache.hadoop.hbase.HBaseConfiguration
 2 import org.apache.hadoop.hbase.client.Put
 3 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 4 import org.apache.hadoop.hbase.mapred.TableOutputFormat
 5 import org.apache.hadoop.hbase.util.Bytes
 6 import org.apache.hadoop.mapred.JobConf
 7 import org.apache.spark.{SparkConf, SparkContext}
 8 import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
 9 /**
10   * Created by *** on 2018/2/12.
11   *
12   * 使用saveAsHadoopDataset写入数据
13   */
14 object SparkWriteHBaseOne {
15   def main(args: Array[String]): Unit = {
16     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
17     val sc = new SparkContext(sparkConf)
18 
19     val conf = HBaseConfiguration.create()
20     //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
21     conf.set("hbase.zookeeper.quorum","node02,node03,node04")
22     //设置zookeeper连接端口,默认2181
23     conf.set("hbase.zookeeper.property.clientPort", "2181")
24 
25     val tablename = "account"
26 
27     //初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
28     val jobConf = new JobConf(conf)
29     jobConf.setOutputFormat(classOf[TableOutputFormat])
30     jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
31 
32     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
33 
34 
35     val rdd = indataRDD.map(_.split(',')).map{arr=>{
36       /*一个Put对象就是一行记录,在构造方法中指定主键
37        * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
38        * Put.add方法接收三个参数:列族,列名,数据
39        */
40       val put = new Put(Bytes.toBytes(arr(0).toInt))
41       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
42       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
43       //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
44       (new ImmutableBytesWritable, put)
45     }}
46 
47     rdd.saveAsHadoopDataset(jobConf)
48 
49     sc.stop()
50   }
51 }

  2.第二种方式:

 1 import org.apache.hadoop.hbase.client.{Put, Result}
 2 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 3 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
 4 import org.apache.hadoop.hbase.util.Bytes
 5 import org.apache.hadoop.mapreduce.Job
 6 import org.apache.spark._
 7 /**
 8   * Created by *** on 2018/2/12.
 9   *
10   * 使用saveAsNewAPIHadoopDataset写入数据
11   */
12 object SparkWriteHBaseTwo {
13   def main(args: Array[String]): Unit = {
14     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
15     val sc = new SparkContext(sparkConf)
16 
17     val tablename = "account"
18 
19     sc.hadoopConfiguration.set("hbase.zookeeper.quorum","node02,node03,node04")
20     sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
21     sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
22 
23     val job = new Job(sc.hadoopConfiguration)
24     job.setOutputKeyClass(classOf[ImmutableBytesWritable])
25     job.setOutputValueClass(classOf[Result])
26     job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
27 
28     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
29     val rdd = indataRDD.map(_.split(',')).map{arr=>{
30       val put = new Put(Bytes.toBytes(arr(0)))
31       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
32       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
33       (new ImmutableBytesWritable, put)
34     }}
35 
36     rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
37   }
38 }

 

相关知识

  • SparkStreaming与Kafka整合遇到的问题及解决方案

    前言 最近工作中是做日志分析的平台,采用了sparkstreaming+kafka,采用kafka主要是看中了它对大数据量处理的高性能,处理日志类应用再好不过了,采用了sparkstreaming的流处理框架 主要是考虑到它本身是基于spark核心的,以后的批处理可以一站式服务,并且可以提供准实时服…

    2017/7/20 11:45:03
  • spark极简入门

    1.windows上下载安装sbt 去sbt官网下载 sbt包,解压到指定目录,不需要安装。记得配置环境变量。 新建 SBT_HOME ,值是sbt包的解压路径,比如C:\Users\***\Tools\sbt-0.13.15\sbt(建议不要放在C盘) 并在path 中添加 %SBT_HOME%\bin 查看是否成功,命令行输入: sbt sbtVer…

    2017/7/20 11:45:03
  • Redis精华

    Redis的复制功能是完全建立在之前我们讨论过的基于内存快照的持久化策略基础上的,也就是说无论你的持久化策略选择的是什么,只要用到了redis的复制功能,就一定会有内存快照发生,那么首先要注意你的系统内存容量规划,原因可以参考我上一篇文章中提到的Redis磁盘IO问题。R…

    2017/7/20 11:45:03
  • 快速搭建 ELK + OpenWAF 环境

    摘要: OpenWAF是第一个全方位开源的Web应用防护系统; ELK 是比较火的开源日志分析系统; 本节主要介绍,ELK 的 docker 部署及与 OpenWAF 的结合 OpenWAF简介 OpenWAF是第一个全方位开源的Web应用防护系统(WAF),他基于nginx_lua API分析HTTP请求信息。OpenWAF由行为分析引擎…

    2017/7/20 11:45:03

共有访客发表了评论 网友评论

验证码: 看不清楚?