本文介绍如何使用Databricks Delta进行Spark作业的优化。

前提条件

已创建集群,详情请参见创建集群

集群应满足以下配置:

区域 详情
地域(Region) 华北2(北京)
集群规模 1个Master节点,5个Worker节点
ECS实例配置 配置如下:

  • CPU:32核
  • 内存:128GiB
  • ECS规格:ecs.g6.8xlarge
  • 数据盘配置:ESSD云盘300GB X 4块
  • 系统盘配置:ESSD云盘120GB X 1块
说明 ECS实例会因库存等原因和实际售卖页有出入。此处参数仅供参考,具体请您根据实际情况选择相应的实例规格进行测试。

OSS宽带 10Gbps

背景信息

Databricks数据洞察内置了Databricks商业版引擎,您可以利用Databricks数据洞察创建集群,实现在秒级响应时间内处理PB级别的数据。本文示例制造100亿条数据,利用Databricks Delta的Data Skipping和ZOEDER Clustering特性,对Spark作业进行改造,达到优化性能的目的。Databricks Delta详情请参见Processing Petabytes of Data in Seconds with Databricks Delta

阿里云2000元代金券免费领,最新优惠1折抢购,2核4G云服务器仅799元/3年,新老用户同享,立即抢购>>>

配置Spark

  1. 使用阿里云账号登录Databricks数据洞察控制台
  2. 在Databricks数据洞察控制台页面,选择所在的地域(Region)。
    创建的集群将会在对应的地域内,一旦创建后不能修改。
  3. 在左侧导航栏中,单击集群
  4. 单击待配置集群所在行的详情
  5. 集群详情页面,单击上方的Spark配置
  6. 配置以下参数。
    • 修改以下配置。
      参数 描述
      spark.driver.cores 4
      spark.driver.memory 8G
      spark.executor.memory 23G
    • 新增以下配置。
      1. 配置区域,单击spark-defaults页签。
      2. 单击右侧的自定义配置

        设置以下配置。

        参数 描述
        spark.executor.cores 3
        spark.executor.instances 22
        spark.yarn.executor.memoryOverhead default

示例

  1. 准备数据。
    • 准备测试数据和query脚本。

      在集群中生成数据预计需要5小时,生成测试数据详情请参见Processing Petabytes of Data in Seconds with Databricks Delta

    • 准备五张表:
      • conn_random:delta格式表
      • conn_random_parquet:parquet格式表
      • conn_optimize:经过OPTIMIZE的表,主要是Compaction
      • conn_zorder_only_ip:ZORDER BY (src_ip, dst_ip)
      • conn_zorder:ZORDER BY (src_ip, src_port, dst_ip, dst_port)
  2. 使用OPTIMIZE命令进行优化。
    详细代码如下:

    import spark.implicits._  val seed = 0 val numRecords = 10*1000*1000*1000L val numFiles = 1000*1000  val baseLocation = "oss://mytest/records-10m(1000)3-(1000)2/data/random/"  val dbName = s"mdc_random_$numFiles" val connRandom = "conn_random" val connRandomParquet = "conn_random_parquet" // val connSorted = "conn_sorted" val connOptimize = "conn_optimize" val connZorderOnlyIp = "conn_zorder_only_ip" val connZorder = "conn_zorder"  spark.conf.set("spark.sql.shuffle.partitions", numFiles) spark.conf.get("spark.sql.shuffle.partitions")  sql(s"drop database if exists $dbName cascade") sql(s"create database if not exists $dbName") sql(s"use $dbName") sql(s"show tables").show(false)  import scala.util.Random  case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int)  // 生成数据 def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".") def randomPort(r: Random) = r.nextInt(65536)  def randomConnRecord(r: Random) = ConnRecord(   src_ip = randomIPv4(r), src_port = randomPort(r),   dst_ip = randomIPv4(r), dst_port = randomPort(r))  val df = spark.range(0, numFiles, 1, numFiles).mapPartitions { it =>   val partitionID = it.toStream.head   val r = new Random(seed = partitionID)   Iterator.fill((numRecords / numFiles).toInt)(randomConnRecord(r)) }  // 生成数据表 df.write .mode("overwrite") .format("delta") .option("path", baseLocation + connRandom) .saveAsTable(connRandom)  df.write .mode("overwrite") .format("parquet") .option("path", baseLocation + connRandomParquet) .saveAsTable(connRandomParquet)  spark.read.table(connRandom) .write .mode("overwrite") .format("delta") .option("path", baseLocation + connOptimize) .saveAsTable(connOptimize)  spark.read.table(connRandom) .write .mode("overwrite") .format("delta") .option("path", baseLocation + connZorderOnlyIp) .saveAsTable(connZorderOnlyIp)  spark.read.table(connRandom) .write .mode("overwrite") .format("delta") .option("path", baseLocation + connZorder) .saveAsTable(connZorder)  spark.conf.set("spark.databricks.io.skipping.mdc.addNoise", "false")  // OPTIMIZE优化命令 sql(s"OPTIMIZE '${baseLocation + connOptimize}'") sql(s"OPTIMIZE '${baseLocation + connZorderOnlyIp}' ZORDER BY (src_ip, dst_ip)") sql(s"OPTIMIZE '${baseLocation + connZorder}' ZORDER BY (src_ip, src_port, dst_ip, dst_port)")

  3. 验证Spark SQL。
    select count(*) from conn_random where src_ip like '157%' and dst_ip like '216.%';  select count(*) from conn_random_parquet where src_ip like '157%' and dst_ip like '216.%';  select count(*) from conn_optimize where src_ip like '157%' and dst_ip like '216.%';  select count(*) from conn_zorder_only_ip where src_ip like '157%' and dst_ip like '216.%';  select count(*) from conn_zorder where src_ip like '157%' and dst_ip like '216.%';

测试结论

本示例各表情况如下。

表名称 时间(s)
conn_random_parquet 2504
conn_random 2324
conn_optimize 112
conn_zorder 65
conn_zorder_only_ip 46
说明 通过以上示例,可以发现:

  • 经过OPTIMIZE的表,文件大小会在1G左右,而且进行了delta元数据的优化,提高了data-skipping的效率,在性能上提升约20倍(2504/112=22X)。
  • Zorder使得data-skipping的优化效果进一步深化,性能提升约40倍(2504/65=38X)。
  • 当Zorder列是查询列时,优化效果会更加明显,实验显示性能提升约50倍(2504/46=54X)。

问题反馈

您在使用阿里云Databricks数据洞察过程中有任何疑问,欢迎用钉钉扫描下面的二维码加入钉钉群进行反馈。

使用Databricks Delta优化Spark作业_最佳实践_Databricks数据洞察