Note

给Spark添加自定义的metric信息

最近因为一些工作场景需要获取spark 任务的更多信息,所以要修改spark 源码添加新的metric。顺便串一下整个metric体系,形成整体认知。

以下讨论基于spark3.1.2,让我们开始吧

16805751b44d1079.gif

ListenerBus 机制

在动手实操之前,首先我们需要了解spark的listener机制
spark中存在多个模块的协同操作,通过考虑到实时性和结构的解耦,spark引入了事件总线的机制,通过注册和监听事件的方式,可以spark各种事件的触发点执行不同的操作。

在spark中可以指定某个操作的执行作为事件,并事先注册在事件总线中。当该操作执行后发送事件,事件总线就会接受到该事件并异步地将该事件传递给所有监听该事件的监听器,并执行对应方法。这实际上也是典型的生产-消费者模式。我们需要人为的生产一条事件,然后便会有消费者去消费这个事件。最经典的用法就是在某个任务执行前后监听该任务,并近实时获取该任务的开始结束状态,从而获取该任务执行过程中的各种统计信息,也就是我们下面会讲到的metric。


在spark中如果想要自行添加事件监听也非常简单,不过对代码有一些侵入性。这里以task开始执行的为例。
首先我们需要注册事件到listenerBus,先创建一个case class继承SparkListenerEvent

case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
  extends SparkListenerEvent

SparkListenerEvent的具体实现如下:

<at-mention handle="DeveloperApi">@DeveloperApi
</at-mention>@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
  /* Whether output this event to the event log */
  protected[spark] def logEvent: Boolean = true
}

SparkListenerEvent 会被SparkListener接受,并回调指定的方法,具体看SparkListener.scala中的实现:

<at-mention handle="DeveloperApi">@DeveloperApi
</at-mention>abstract class SparkListener extends SparkListenerInterface {
  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }
  // Some code here ...
  override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}

从上面的代码中我们可以得知两点,一是spark内置了一些event的回调方法,但是如果我们自定义event,最终会走到onOtherEvent方法中,所以实际上只需要继承onOtherEvent方法,并在其中进行模式匹配,就可以监听我们自定义的事件,从而减少了对spark core代码的侵入。 二是我们可以继承SparkListener来重载方法,从而加入我们自己的监听逻辑.

至于SparkListener 是如何实现监听的,我们就需要来看ListenerBus的具体实现,在ListenerBus.scala中:

private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {

  // Marked `private[spark]` for access in tests.
  private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava

//Some code here ...

/**
   * Post the event to all registered listeners. The `postToAll` caller should guarantee calling
   * `postToAll` in the same thread for all events.
   */
def postToAll(event: E): Unit = {
    // JavaConverters can create a JIterableWrapper if we use asScala.
    // However, this method will be called frequently. To avoid the wrapper cost, here we use
    // Java Iterator directly.
    val iter = listenersPlusTimers.iterator
    while (iter.hasNext) {
      val listenerAndMaybeTimer = iter.next()
      val listener = listenerAndMaybeTimer._1
      val maybeTimer = listenerAndMaybeTimer._2
      val maybeTimerContext = if (maybeTimer.isDefined) {
        maybeTimer.get.time()
      } else {
        null
      }
      lazy val listenerName = Utils.getFormattedClassName(listener)
      try {
        doPostEvent(listener, event)
        if (Thread.interrupted()) {
          // We want to throw the InterruptedException right away so we can associate the interrupt
          // with this listener, as opposed to waiting for a queue.take() etc. to detect it.
          throw new InterruptedException()
        }
      } catch {
        case ie: InterruptedException =>
          logError(s"Interrupted while posting to ${listenerName}. Removing that listener.", ie)
          removeListenerOnError(listener)
        case NonFatal(e) if !isIgnorableException(e) =>
          logError(s"Listener ${listenerName} threw an exception", e)
      } finally {
        if (maybeTimerContext != null) {
          val elapsed = maybeTimerContext.stop()
          if (logSlowEventEnabled && elapsed > logSlowEventThreshold) {
            logInfo(s"Process of event ${redactEvent(event)} by listener ${listenerName} took " +
              s"${elapsed / 1000000000d}s.")
          }
        }
      }
    }
  }

}

上面的代码可以简单解释一下,在listenerBus中实际用一个list保存所有的监听器Listener,并用一个定时器去定时调度,执行监听方法。这里只是基础实现,实际上我们常用的SparkListenerBus 具体由AsyncEventQueue实现,在实际发送事件时采用异步的方式,不会引起阻塞。


了解了ListenerBus的具体实现,我们就可以更自如的使用它。在我们具体的实现类中,只需要传入listener Bus的实例,如下面dagScheduler.scala的例子:

private[spark] class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = new SystemClock())
  extends Logging {

    // some code here ...

}

然后在需要监听的行为后面发送事件:

private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit = {
    // Note that there is a chance that this task is launched after the stage is cancelled.
    // In that case, we wouldn't have the stage anymore in stageIdToStage.
    val stageAttemptId =
      stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
    listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
  }

这样,在执行到任务开始的逻辑后,spark就会自动监听并发送事件,我们的listener接受到事件后,就会执行onTaskStart的方法。

Metric体系

spark 引用了一个第三方的metric 系统 其基本逻辑也就是从注册一些指标,用kv存储的方式统计这些指标,并实现对应的输出
主要概念有三个

  • source metric的来源,例如JvmSource

  • sink metric的输出, spark支持多种输出方式,例如CSVSink

  • MetricRegistry,具体存储metric的类,由第三方库实现

    我们今天暂时不讲metric如果具体实现,而是关注SparkUI如何展示这些metric。以ExecutorMetric为例。
    进入ExecutorMetric.scala, 我们可以看到实际上executorMetric实际是一层封装,具体的metric信息在ExecutorMetricType.scala中:

class ExecutorMetrics private[spark] extends Serializable {
  // Metrics are indexed by ExecutorMetricType.metricToOffset
  private val metrics = new Array[Long](ExecutorMetricType.numMetrics)

/** Returns the value for the specified metric. */
  def getMetricValue(metricName: String): Long = {
    metrics(ExecutorMetricType.metricToOffset(metricName))
  }
  // Some code here ..
  }
private[spark] object ExecutorMetricType {

  // List of all executor metric getters
  val metricGetters = IndexedSeq(
    JVMHeapMemory,
    JVMOffHeapMemory,
    OnHeapExecutionMemory,
    OffHeapExecutionMemory,
    OnHeapStorageMemory,
    OffHeapStorageMemory,
    OnHeapUnifiedMemory,
    OffHeapUnifiedMemory,
    DirectPoolMemory,
    MappedPoolMemory,
    ProcessTreeMetrics,
    GarbageCollectionMetrics
  )

  val (metricToOffset, numMetrics) = {
    var numberOfMetrics = 0
    val definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int]
    metricGetters.foreach { m =>
      (0 until m.names.length).foreach { idx =>
        definedMetricsAndOffset += (m.names(idx) -> (idx + numberOfMetrics))
      }
      numberOfMetrics += m.names.length
    }
    (definedMetricsAndOffset, numberOfMetrics)
  }
}

这里我们就不看这些metric的实际统计方法了,只关注如何保存这些metric。实际上在ExecutorMetricType的类中很多都是调用JMX实现的统计。

这些metric是如何展示的呢?我们首先来看看api.scala,这里保存了executorMetric:

class ExecutorStageSummary private[spark](
    val taskTime : Long,
    val failedTasks : Int,
    val succeededTasks : Int,
    val killedTasks : Int,
    val inputBytes : Long,
    val inputRecords : Long,
    val outputBytes : Long,
    val outputRecords : Long,
    val shuffleRead : Long,
    val shuffleReadRecords : Long,
    val shuffleWrite : Long,
    val shuffleWriteRecords : Long,
    val memoryBytesSpilled : Long,
    val diskBytesSpilled : Long,
    @deprecated("use isExcludedForStage instead", "3.1.0")
    val isBlacklistedForStage: Boolean,
    @JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
    @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
    val peakMemoryMetrics: Option[ExecutorMetrics],
    val isExcludedForStage: Boolean)

这里随便挑了一个类,实际上还有别的类里保存了executorMetric,我们可以发现这里的metric信息排列其实和sparkUI上的展示如出一辙,可以联想到,在前端展示的数据格式就是这种。

再向上追溯,我们发现它被封装成LiveEntity:

private class LiveExecutorStageSummary(
    stageId: Int,
    attemptId: Int,
    executorId: String) extends LiveEntity {

  import LiveEntityHelpers._

  var taskTime = 0L
  var succeededTasks = 0
  var failedTasks = 0
  var killedTasks = 0
  var isExcluded = false

  var metrics = createMetrics(default = 0L)

  val peakExecutorMetrics = new ExecutorMetrics()

  override protected def doUpdate(): Any = {
    val info = new v1.ExecutorStageSummary(
      taskTime,
      failedTasks,
      succeededTasks,
      killedTasks,
      metrics.inputMetrics.bytesRead,
      metrics.inputMetrics.recordsRead,
      metrics.outputMetrics.bytesWritten,
      metrics.outputMetrics.recordsWritten,
      metrics.shuffleReadMetrics.remoteBytesRead + metrics.shuffleReadMetrics.localBytesRead,
      metrics.shuffleReadMetrics.recordsRead,
      metrics.shuffleWriteMetrics.bytesWritten,
      metrics.shuffleWriteMetrics.recordsWritten,
      metrics.memoryBytesSpilled,
      metrics.diskBytesSpilled,
      isExcluded,
      Some(peakExecutorMetrics).filter(_.isSet),
      isExcluded)
    new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info)
  }

}

在这里暴露它的更新方法doUpdate()。 再向上追溯我们可以看到再LiveEntity.scala中提供了对象的保存方法:


// LiveEntity.scala 
def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = {
    // Always check triggers on the first write, since adding an element to the store may
    // cause the maximum count for the element type to be exceeded.
    store.write(doUpdate(), checkTriggers || lastWriteTime == -1L)
    lastWriteTime = now
  }

这里我们可以得到,metric的实际信息会最终保存到一个kv 存储中供前端调用。而这个实体类是如何更新的呢?跳转到该方法的引用位置,我们发现实际上该方法是被appStatusListener的监听方法调用。基于上一部分的探索,由此我们可以得出结论,metric的更新就是通过SparkListener的监听机制更新。

private[spark] class AppStatusListener(
    kvstore: ElementTrackingStore,
    conf: SparkConf,
    live: Boolean,
    appStatusSource: Option[AppStatusSource] = None,
    lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {

    // Some code here...
}

在sparkUI上添加自定义的executor metric

有了上面的知识储备,我们总算能在spark UI 中自由加入自己的metric。由于前端数据的交互都是js实现,我们只需要定位到对应的js文件就能找到数据的展示位置。
我们以executorMetric为例,首先找到 executor对应的page,在executorspage.js中:

我们找到如下ajax函数:

$.get(createTemplateURI(appId, "executorspage"), function (template) { ...});

实际的数据展示就是以下面的方式展示:

{
              data: function (row, type) {
                var peakMemoryMetrics = row.peakMemoryMetrics;
                if (typeof peakMemoryMetrics !== 'undefined') {
                  if (type !== 'display')
                    return peakMemoryMetrics.DirectPoolMemory;
                  else
                    return (formatBytes(peakMemoryMetrics.DirectPoolMemory, type) + ' / ' +
                      formatBytes(peakMemoryMetrics.MappedPoolMemory, type));
                } else {
                  if (type !== 'display') {
                    return 0;
                  } else {
                    return '0.0 B / 0.0 B';
                  }
                }
              }
            }

在前端展示则是在以下html元素:


<!-- in executorspage-template.html-->
<th>
  <span data-toggle="tooltip" data-placement="top"
    title="Peak direct byte buffer / memory-mapped buffer pool memory used by JVM. This refers to BufferPoolMXBean with form 'java.nio:type=BufferPool,name=direct' and 'java.nio:type=BufferPool,name=mapped'.">
              Peak Pool Memory Direct / Mapped
  </span>
</th>

可以看到这里直接 从后端数据中获取peakMemoryMetrics这个对象,那么数据返回格式什么呢?我们查询前端请求函数:

// executorspage.js
var endPoint = createRESTEndPointForExecutorsPage(appId);

// utils.js
function createRESTEndPointForExecutorsPage(appId) {
var words = getBaseURI().split('/');
var ind = words.indexOf("proxy");
var newBaseURI;
if (ind > 0) {
  appId = words[ind + 1];
  newBaseURI = words.slice(0, ind + 2).join('/');
  return newBaseURI + "/api/v1/applications/" + appId + "/allexecutors";
}
ind = words.indexOf("history");
if (ind > 0) {
  appId = words[ind + 1];
  var attemptId = words[ind + 2];
  newBaseURI = words.slice(0, ind).join('/');
  if (isNaN(attemptId)) {
    return newBaseURI + "/api/v1/applications/" + appId + "/allexecutors";
  } else {
    return newBaseURI + "/api/v1/applications/" + appId + "/" + attemptId + "/allexecutors";
  }
}
return uiRoot + "/api/v1/applications/" + appId + "/allexecutors";
}

发现请求的是“/allexecutors”,于是我们查找后端对应处理逻辑:

// OneApplicationResource.scala
  <at-mention handle="GET">@GET
</at-mention>  @Path("allexecutors")
  def allExecutorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(false))

发现返回的格式就是我们之前查看到的ExecutorSummary类。至此,我们已经完成掌握了metric的前端展示逻辑。


那么我们如何添加自定义的metric信息呢?假设我们需要在executor页面上新增一个metric。

首先第一步:在api.scala 中的executorSummary类添加我们自己的metric对象

第二步:在appStausListener.scala 中 在我们想要更新metric的事件的监听逻辑加入我们自己的metric更新逻辑

第三步:在executorspage.js 中的data展示数组中添加一个对象,加入我们自己的数据展示逻辑

第四步:不要忘记在executorspage-template.html中heml元素来展示我们的数据。注意数据展示的顺序是数据的顺序,一定要和js中数组的数据保存顺序一致。


由此,我们就实现了在SparkUI中添加我们的自定义metric信息了。你学废了吗?

0
0
...
...
...
Avatar