产业新知热门
加国央行再次疯狂加息 月供将暴涨 物价竟会这样变化 无人幸免的购物节“大逃杀” 兼职做自媒体这些天:有人年入五块四,有人时薪一百二 瞄准数亿过敏人群,以过敏数字疗法为切点,杭州数智医掘金新蓝海 武汉大学疑似出现霍乱病例 期货不严格止损是超短线交易失败的根源 ,精细赛道也能走到上市! 预制菜,会有“刺客”吗推荐资讯
让区块链变成人人可用的工具,上海原创Web3.0操作系统是如何诞生的 隐私之变|自我主张时代变革,从构建WEB3.0的ID体系开始 被投资圈盯上,风头超过元宇宙,Web3.0到底是啥? Web3.0,勾勒下一代互联网模样 为什么说中国汽车产业已经真正“支棱”起来了 我在新能源汽车行业打工10年:从月薪2千涨至年薪40万,终于熬出头 对话梅宏院士:数字化转型不是想不想,而是必须转 王兴继续“电商零售梦”:告别社区团购 美团优选变身明日达超市Iceberg小文件处理和读数流程分析
第一部分:Spark读取Iceberg流程分析 :
这个部分我们分析常规数据读取流程,不涉及到数据更新,删除等场景下的读取。
数据读取大概可以分为两个步骤
通过 Iceberg 的元数据 snapshot, manifest file 等解析出包含数据文件信息的 DataFile 对象
读取数据文件内容,把每行数据封装成 Spark 的 InternalRow 返回给引擎层
Spark 引擎层和 Iceberg 对接
根据 Spark 规范,如果要让 Spark 读取数据,需要实现以下几个接口。
Spark 在读取数据之前,需要为每个 Executor 分配数据文件,然后通过 Reader 读取数据文件。这两个接口都是在
org.apache.spark.sql.connector.read.Batch
实现的,创建 Batch 的步骤如下Iceberg 的 SparkTable 实现了
SupportsRead
的newScanBuilder
方法,创建出SparkScanBuilder
SparkScanBuilder
会创建出SparkBatchQueryScan
SparkBatchQueryScan 的 toBatch 方法创建出表示 批操作 的Batch 对象
通过
SparkBatchQueryScan
的 planInputPartitions 获取要读取的数据分片通过
SparkBatchQueryScan
, 生成 Reader 读取数据
重点看一下 Batch 接口的定义
//org.apache.spark.sql.connector.read.Batch;
public interface Batch { //表示一个输入分片 InputPartition[] planInputPartitions(); //为每个输入的文件创建 Reader PartitionReaderFactory createReaderFactory();}生成数据分片
先通过流程图,看一下涉及到的类
Iceberg 中的
SparkBatchQueryScan
同时实现了 Spark Scan 和 Batch 接口
首先 Spark 引擎会调用
SparkBatchQueryScan
的planInputPartitions
方法, 获取输入分片。SparkBatchQueryScan
表示普通的查询,SparkMergeScan 用在需要数据合并的场景下。planInputPartitions
方法先调用的 tasks() 方法获取到 CombinedScanTask,然后再封装成 ReadTask 返回给引擎。ReadTask 实现了 InputPartition接口,但 InputPartition 接口没有定义有用的方法,具体封装什么数据由 ReadTask 决定。
ReadTask 实际上封装的是 每个数据文件的 元信息,最终作为 Spark Reader 的输入。
// org.apache.iceberg.spark.source.SparkBatchScan
@Override public InputPartition[] planInputPartitions() { //生成 CombinedScanTask List在进行数据文件分片之前,已经由表名通过 Catalog 加载了表的 metadata.json 文件,生成baseTable。
通过 baseTable 创建出 TableScan,对应的实现类是
DataTableScan
, 同时指定了一些过滤条件,snapshot 的时间范围等,通过 TableScan 的子类来查找数据文件。protected List
TableScan的继承结构如下,通过继承关系,也可以发现 Iceberg 是支持增量读取的
DataTableScan
通过以下流程来生成输入分片由 planFiles() 获取所有需要读取的数据文件, 实际是委托给了 ManifestGroup 来操作
对大文件进行拆分,如果单个文件大小超过 SPLIT_SIZE,默认128M,并且文件格式支持切分,会对文件进行切分
将小文件打包在一起,如果文件比较小,比如只有1KB, 会将多个文件打包成一个输入分片,如果文件大小小于 SPLIT_OPEN_FILE_COST, 默认4M,会按照 SPLIT_OPEN_FILE_COST 来计算
//org.apache.iceberg.baseTableScan
public CloseableIterableFileScanTask 的继承关系如下,
FileScanTask 表示一个输入文件或者一个文件的一部分,在这里实现类是 baseFileScanTask
CombinedScanTask 表示多个 FileScanTask 组合在一起,实现类是
baseCombinedScanTask
由于定位文件需要 Manifest 等信息,先通过 snapshot.dataManifests() 读取当前 snapshot 的 manifestlist 文件,
解析出表示 ManifestFile 的对象 。
再构造出一个 ManifestGroup ,让 ManifestGroup 根据 manifest file 来获取输入的文件
//org.apache.iceberg.ManifestGroup
@Override public CloseableIterable由于不考虑删除等场景,所以获取 文件信息 的流程比较简单
通过 ManifestFile 对象 去读取所有的 ManifestFile 文件
通过 ManifestFile 解析出 DataFile 对象
如果有谓词下推,会对 DataFile 做过滤,进行文件级别裁剪
将符合条件的 DataFile 封装到 baseFileScanTask 中
public CloseableIterable
通过一系列操作,我们获取到了包含数据文件信息的 DataFile 对象,将其封装在 CombinedScanTask 进行返回
数据读取
在获取到要读取的数据文件信息后,Spark 会为每个任务分配数据分片,由 Executor 进行读取。
先看一下 Spark 定义的接口
org.apache.spark.sql.connector.read.PartitionReader
,很符合火山模型的定义,但 Spark 现在也支持向量化读取。RowDataReader 实现了
PartitionReader
接口 ,看一下 RowDataReader 继承关系。整个数据读取的核心逻辑 就是读取 Parquet 中的数据。
//org.apache.spark.sql.connector.read.PartitionReader
public interface PartitionReader
通过 ReaderFactory 创建出 PartitionReader,比较简单,需要注意一下 RowReader 的输入是 ReadTask,可以把ReadTask 理解成 Iceberg DataFile 对象的封装
static class ReaderFactory implements PartitionReaderFactory {
@Override public PartitionReaderbaseDataReader 实现了
PartitionReader
的 next 接口,之前讲过 CombinedScanTask 里面包含了多个文件,next 把具体的读操作再委托读每个文件// 代码有简化
abstract class baseDataReaderRowDataReader 会根据文件格式,使用对应的 Format Reader , 通过 newParquetIterable() 方法,这里返回的是
ParquetFileReade
。实际读取操作由Parquet 库提供的
org.apache.parquet.hadoop.ParquetFileReader
实现//org.apache.iceberg.spark.source.RowDataReader
protected CloseableIterable至此 数据读取的逻辑就分析完毕,核心就是去读文件,把每行数据封装成 Spark 的 InternalRow,返回给 Spark 引擎。
第二部分:Iceberg 解决小文件问题概览 :
如下是我们使用 Spark 写两次数据到 Iceberg 表的数据目录布局:
/data/hive/warehouse/default.db/iteblog
├── data│ └── ts_year=2020│ ├── id_bucket=0│ │ ├── 00000-0-19603f5a-d38a-4106-aeb9-47285d15a6bd-00001.parquet│ │ ├── 00000-0-491c447b-e05f-40ac-8c32-44d5ef39353b-00001.parquet│ │ ├── 00001-1-4181e872-94fa-4699-ab5d-81dffa92de0c-00002.parquet│ │ └── 00001-1-9b5f5291-af3b-4edc-adff-fbddf3e53709-00002.parquet│ └── id_bucket=1│ ├── 00001-1-4181e872-94fa-4699-ab5d-81dffa92de0c-00001.parquet│ └── 00001-1-9b5f5291-af3b-4edc-adff-fbddf3e53709-00001.parquet└── metadata ├── 00000-0934ba0e-8ed9-48e3-9db2-25dca1f896f2.metadata.json ├── 00001-454ee707-dbc0-4fd8-8c64-0910d8d6315b.metadata.json ├── 00002-7bdd0e6b-ec2b-4b1a-b355-6a81a3f5a0ae.metadata.json ├── 2170dbc3-334e-41ad-ac2c-68dd7470f001-m0.avro ├── 22e1674c-71ea-4d84-8982-276c3370d5c0-m0.avro ├── snap-5353330338887146702-1-2170dbc3-334e-41ad-ac2c-68dd7470f001.avro └── snap-6683043578305788250-1-22e1674c-71ea-4d84-8982-276c3370d5c0.avro5 directories, 13 files因为我们每次写入的数据就几条,Iceberg 每个分区写文件的时候都是产生新的文件,这就导致底层文件系统里面产生了很多大小才几KB的文件。如果我们是使用 Spark Streaming 的方式7*24小时不断地往 Apache Iceberg 里面写数据,这将产生大量的小文件。
使用 Iceberg 来压缩文件
值得高兴的是,Apache Iceberg 给我们提供了相关 Actions API 来合并这些小文件,具体如下:
Configuration conf = new Configuration();
conf.set(metaSTOREURIS.varname, "thrift://localhost:9083");Map运行完上面代码之后,可以将 Iceberg 的小文件进行合并,得到的新数据目录如下:
⇒ tree /data/hive/warehouse/default.db/iteblog
/data/hive/warehouse/default.db/iteblog├── data│ └── ts_year=2020│ ├── id_bucket=0│ │ ├── 00000-0-19603f5a-d38a-4106-aeb9-47285d15a6bd-00001.parquet│ │ ├── 00000-0-491c447b-e05f-40ac-8c32-44d5ef39353b-00001.parquet│ │ ├── 00001-1-4181e872-94fa-4699-ab5d-81dffa92de0c-00002.parquet│ │ ├── 00001-1-9b5f5291-af3b-4edc-adff-fbddf3e53709-00002.parquet│ │ └── 00001-1-da4e85fa-096d-4b74-9b3c-a260e425385d-00001.parquet│ └── id_bucket=1│ ├── 00000-0-1d5871d5-08ac-4e43-a589-70a5d50dd4d2-00001.parquet│ ├── 00001-1-4181e872-94fa-4699-ab5d-81dffa92de0c-00001.parquet│ └── 00001-1-9b5f5291-af3b-4edc-adff-fbddf3e53709-00001.parquet└── metadata ├── 00000-0934ba0e-8ed9-48e3-9db2-25dca1f896f2.metadata.json ├── 00001-454ee707-dbc0-4fd8-8c64-0910d8d6315b.metadata.json ├── 00002-7bdd0e6b-ec2b-4b1a-b355-6a81a3f5a0ae.metadata.json ├── 00003-d987d15f-2c7c-427c-849e-b8842d77d28e.metadata.json ├── 2170dbc3-334e-41ad-ac2c-68dd7470f001-m0.avro ├── 22e1674c-71ea-4d84-8982-276c3370d5c0-m0.avro ├── 25126b97-5a87-42b7-b45a-499aa41e7359-m0.avro ├── 25126b97-5a87-42b7-b45a-499aa41e7359-m1.avro ├── 25126b97-5a87-42b7-b45a-499aa41e7359-m2.avro ├── snap-3634417817414108593-1-25126b97-5a87-42b7-b45a-499aa41e7359.avro ├── snap-5353330338887146702-1-2170dbc3-334e-41ad-ac2c-68dd7470f001.avro └── snap-6683043578305788250-1-22e1674c-71ea-4d84-8982-276c3370d5c0.avro5 directories, 20 files对比最新的结果可以得出:
ts_year=2020/id_bucket=0 新增了名为 00001-1-da4e85fa-096d-4b74-9b3c-a260e425385d-00001.parquet 的数据文件,这个其实就是把之前四个文件进行和合并得到的新文件;
ts_year=2020/id_bucket=1 新增了名为 00000-0-1d5871d5-08ac-4e43-a589-70a5d50dd4d2-00001.parquet 的数据文件,这个其实就是把之前两个文件进行和合并得到的新文件。
Iceberg 小文件合并原理
Iceberg 小文件合并是在 org.apache.iceberg.actions.RewriteDataFilesAction 类里面实现的。小文件合并其实是通过 Spark 并行计算的,这也就是上面 DEMO 初始化了一个 SparkSession 的原因。我们可以通过 RewriteDataFilesAction 类的 targetSizeInBytes 方法来设置输出的合并文件大小。
注意:合并最终的文件并不是都小于或等于 targetSizeInBytes,甚至会出现文件根本没合并的情况。
当我们调用了 execute() 方法,RewriteDataFilesAction 类会先创建出一个 org.apache.iceberg.DataTableScan,然后会把对应表的最新快照(Snapshot)拿出来,紧接着拿出这个快照对应的底层所有数据文件。然后按照分区 Key 进行分组(group),同一个分区的文件放到一起,并将这些信息放到 Map
由于分区里面可能存在一个文件,这时候就没必要去执行文件合并,这时候可以去掉这部分分区,得到了
Map
如果 filteredGroupedTasks 不为空,则对每个分区里面的文件进行 split 和 combine 操作,如下:
// Split and combine tasks under each partition
ListcombinedScanTasks 结构如下:
Apache iceberg write path 如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公众号:iteblog_hadoop combinedScanTasks 里面其实就是封装了 baseCombinedScanTask 类,这个类里面的 task 就是标识哪些 Iceberg 的数据文件需要合并到新文件里面。得到 combinedScanTasks 之后会构造出一个 RDD:
JavaRDD
然后最终会调用 taskRDD 的 map 方法,遍历 combinedScanTasks 里面的 task,将 task 里面对应的 Iceberg 读出来,再写到新文件里面:
public List
rewriteDataForTask 的实现如下:
private TaskResult rewriteDataForTask(CombinedScanTask task) throws Exception {
TaskContext context = TaskContext.get(); int partitionId = context.partitionId(); long taskId = context.taskAttemptId(); RowDataReader dataReader = new RowDataReader( task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive); SparkAppenderFactory appenderFactory = new SparkAppenderFactory( properties, schema, SparkSchemaUtil.convert(schema)); OutputFileFactory fileFactory = new OutputFileFactory( spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId); baseWriter writer; if (spec.fields().isEmpty()) { writer = new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE); } else { writer = new PartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, schema); } try { while (dataReader.next()) { InternalRow row = dataReader.get(); writer.write(row); } dataReader.close(); dataReader = null; return writer.complete(); } catch (Throwable originalThrowable) { ...... }}rewriteDataForTasks 执行完会返回新创建文件的路径,最后会写到新的快照里面。在快照里面会将新建的文件表示为 org.apache.iceberg.ManifestEntry.Status#ADDED,上一个快照里面的文件标记为 org.apache.iceberg.ManifestEntry.Status#DELETED。
更多相关内容
-
2022陕西科学仪器展,西安科博会,西部科博会
全新升级!2022第16届西安科博会招商火热进行中~ 由西安市人民政府主办的2022第16届西安科学技术产业博...
-
ai智能获客系统,大数据拓客系统,各行业高效获客必备!
大数据智能营销拓客系统,认证正版--鹰眼智客远程演示,同威:15538360637智能拓客系统,现在市面上的一...
-
OpenAI炼丹秘籍:教你学会训练大型神经网络
来源:新智元本文详细介绍了一些训练大型神经网络的相关技术及底层原理。 想知道那些超大规模神经网...
-
万象城PK武商 大悦城PK喜隆多 新老项目交锋谁更强
下半年新项目开始进入开业高峰期, 截至目前已有超50个项目官宣开业时间,预估会有300+项目于三四季度...
-
优酷:新增播后定级分账模式,剧集涵盖多个垂类题材|网播平台分账剧表现盘点(二)
00 前言 优酷自2018年开始布局分账剧领域,相继推出“优
-
元数据标准首发,解码数据中台能力
数字经济时代,数据中台逐渐成长为各行各业发展的基石。它既是企业内部可复用的技术平台,同时也是企业...
-
第七批集采结果出炉,齐鲁如何“杀疯了”
来源:药智网/乖扁豆 第七批集采,虽迟但到。本批国采纳入的61个药品品种中,超过10个品种有10家以...
-
远度+佰才邦:“小型复合翼无人机空中应急公专网基站”完成系统定型试验
近日,远度科技联合佰才邦研发的“ 小型复合翼无人机空中应急公专网基站”完成了系统定型试验:ZT-80V...
-
“楚天云飞”完成新一轮融资
证券时报10日消息,楚天科技披露增发购买资产预案,公司拟通过增发股份的方式,购买控股子公司楚天飞云...
-
如何通过tiktok带货,tiktok可以直播带货吗
1.关于选品:我有位朋友在TikTok上采用女性比较喜欢的产品,例如洗护类,就是用了直接有效果的那种。我...
推荐阅读