[EDP] 为 Spark Standalone 部署添加引擎

https://blueprints.launchpad.net/sahara/+spec/edp-spark-standalone

Spark 配置插件允许在 Sahara 中创建 Spark Standalone 集群。Sahara EDP 应该支持在这些集群上运行 Spark 应用程序。

问题描述

Spark 插件使用 Spark 的standalone 部署模式(而不是 Spark on YARN 或 Mesos)。必须创建一个 EDP 实现,使用操作系统提供的设施和 Spark Standalone 部署来支持基本的 EDP 功能。

基本的 EDP 功能是

  • run_job()

  • get_job_status()

  • cancel_job()

提议的变更

Sahara 作业管理器最近经过重构,允许配置插件根据集群和 job_type 选择或提供 EDP 引擎。插件可以返回自定义 EDP 作业引擎对象,或者选择 Sahara 提供的默认引擎。

可以向 Sahara 添加一个用于 Spark Standalone 集群的默认作业引擎,该引擎实现基本的 EDP 功能。

请注意,Spark 中没有公开的 API 用于作业状态或取消,除了可能通过在 Scala 程序中实例化的 SparkContext 对象提供的设施之外。但是,无需开发 Scala 程序即可提供基本功能。

  • 引擎选择标准

    Spark 配置插件必须确定是否可以使用默认 Spark EDP 引擎在特定集群上运行特定作业。要使用该引擎,必须满足以下条件

    • 默认引擎将使用 spark-submit 运行作业,因此集群必须至少具有 Spark 1.0.0 版本才能使用该引擎。

    • 作业类型必须是 Java(最初)

  • 通过 ssh 进行远程命令

    所有操作都应使用 ssh 运行远程命令来实现,而不是编写自定义代理和客户端。此外,任何长时间运行的命令都应异步运行。

  • run_job()

    run_job() 函数将使用 Spark 提供的 spark-submit 脚本实现。

    • spark-submit 必须使用客户端部署模式运行

    • spark-submit 命令将通过 ssh 执行。Ssh 必须立即返回,并且必须返回一个进程 ID (PID),该 PID 可用于检查状态或取消。这意味着该进程在后台运行。

    • 运行 spark-submit 的进程不得忽略 SIGINT。需要在此处小心,因为从 bash 后台运行的进程的默认行为是忽略 SIGINT。(可以通过从 ssh 启动的包装器中的子进程来处理此问题,该包装器首先恢复 SIGINT,然后启动包装器。在这种情况下,包装器必须确保将 SIGINT 传播到子进程)。

    • spark-submit 要求主应用程序 jar 位于运行它的节点上的本地存储中。支持的 jar 可以在本地存储或 hdfs 中。为了简单起见,所有 jar 都将上传到本地存储。

    • spark-submit 应该从主节点上的一个子目录中运行,该子目录位于一个众所周知的路径中。子目录命名应包含 Sahara 中的作业名称和作业执行 ID,以便于查找该目录。程序文件、输出和日志应写入此目录。

    • spark-submit 返回的退出状态必须写入工作目录中的一个文件。

    • stderrstdout 来自 spark-submit 应该被重定向并保存在工作目录中。这将有助于调试,并为像 SparkPi 这样的将结果写入 stdout 的应用程序保留结果。

    • Sahara 将允许用户指定 Spark 应用程序的参数。任何输入和输出数据源都将作为路径参数指定给 Spark 应用程序。

  • get_job_status()

    可以通过监控 run_job() 返回的 PID 通过 ps 并读取包含退出状态的文件来确定作业状态

    • 初始作业状态是 PENDING(所有 Sahara 作业都是如此)

    • 如果 PID 存在,则作业状态是 RUNNING

    • 如果 PID 不存在,请检查退出状态文件 - 如果退出状态为 0,则作业状态为 SUCCEEDED - 如果退出状态为 -2 或 130,则作业状态为 KILLED(由 SIGINT 杀死) - 对于任何其他退出状态,作业状态为 DONEWITHERROR

    • 如果作业在 Sahara 中失败(例如,由于异常),则作业状态将为 FAILED

  • cancel_job()

    可以通过向运行 spark-submit 的进程发送 SIGINT 来取消 Spark 应用程序。

    • cancel_job() 应该向 run_job() 返回的 PID 发送 SIGINT。如果 PID 是包装器的进程 ID,则包装器必须确保将 SIGINT 传播到子进程

    • 如果发送 SIGINT 的命令成功(例如,kill 返回 0),则 cancel_job() 应该调用 get_job_status() 来更新作业状态

替代方案

Ooyala 作业服务器是实现 Spark EDP 的另一种选择,但它本身就是一个 OpenStack 之外的项目,并引入了另一个依赖项。它必须由 Spark 配置插件安装,并且 Sahara 贡献者必须彻底了解它。

除了 Ooyala 之外,似乎没有现有的客户端或 API 用于处理作业提交、监控和取消。

数据模型影响

没有数据模型影响,但一些字段将被重用。

oozie_job_id 将存储一个 ID,该 ID 允许对正在运行的应用程序进行操作。该字段的名称将来应进行泛化。

job_execution.extra 字段可用于存储允许对正在运行的应用程序进行操作的额外信息

REST API 影响

其他最终用户影响

无。最初,Spark 作业(jar)可以使用 Java 作业类型运行。在某个时候,将添加一个特定的 Spark 作业类型(将在单独的规范中介绍)。

部署者影响

开发者影响

Sahara-image-elements impact

Sahara-dashboard / Horizon 影响

实现

负责人

Trevor McKay

工作项

  • 在 Spark 配置插件中实现默认 Spark 引擎选择

  • 实现 run

  • 实现 get_job_status

  • 实现 cancel

  • 实现启动包装器

  • 实现单元测试

依赖项

测试

将为 Sahara 中的更改添加单元测试。

Spark Standalone 集群的集成测试将在另一个蓝图和规范中添加。

文档影响

用户指南的弹性数据处理部分应说明运行 Spark 作业的能力以及任何限制。

参考资料