`
侯上校
  • 浏览: 217089 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

解决数据倾斜

阅读更多
import org.apache.spark._
import scala.collection.mutable._
import scala.collection.mutable.HashMap
import org.apache.spark.Partitioner
import java.net.URL
val sc = SparkContext.getOrCreate
val rdd1 = sc.textFile("/home/hadoop/my.data")
val rdd2 = rdd1.map(x => {
    val url = x.split(",")(0)
    val urls = new URL(url)
    val host = urls.getHost
    host
}).distinct().collect
val rdd3 = rdd1.map(x => {
    val urls = new URL(x.split(",")(0))
    val host = urls.getHost
(x.split(",")(0), x)
}) 
val rdd4 = rdd3.partitionBy(new HOSTpartitioner(rdd2))
rdd4.saveAsTextFile("/home/hadoop/par")
rdd4.saveAsHadoopFile("/home/hadoop/partit",classOf[Text],classOf[Text],classOf[TextOutputFormat[Text,Text]])

rdd1.map(x => x.split(",")(0)).collect

class HOSTpartitioner(ins: Array[String]) extends Partitioner{
    //初始化一个hashMap对象
    val parMap = new HashMap[String,Int]()
    var count = 0
    //遍历array 把它的每个位置上的值作为key,每个位置的下标作为vale存入hashMap中
    for ( i <- ins ){
      parMap += (i->count)
      count += 1
    }
     //必须实现的方法,指定分为几个区。
    override def numPartitions : Int = count
    //获取分区号,传入的是key-value类型中的可以。
    override def getPartition(key: Any) :Int = {
      val urls = new URL(key.toString)
      val host = urls.getHost
      return parMap.getOrElse(host.toString,0)

    }
}


var rdd11 = sc.makeRDD(Array((1, "A"), (2,"B"), (3,"C"), (4, "D")) ,2)
var rdd1 = sc.makeRDD(Array( "A","B","C", "D") ,2)
rdd1.partitions.size
val rdd22 = rdd11.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd22.partitions.size

var rdd11 = sc.makeRDD(Array("A", "B", "C", "D") ,2)
http://www.baidu.com,123
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics