博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用Python进行稳定可靠的文件操作
阅读量:6922 次
发布时间:2019-06-27

本文共 9842 字,大约阅读时间需要 32 分钟。

hot3.png

程序需要更新文件。虽然大部分程序员知道在执行I/O的时候会发生不可预期的事情,但是我经常看到一些异常幼稚的代码。在本文中,我想要分享一些如何在Python代码中改善I/O可靠性的见解。

考虑下述Python代码片段。对文件中的数据进行某些操作,然后将结果保存回文件中:

1
2
3
4
5
with
open
(filename) as f:
   
input
=
f.read()
output
=
do_something(
input
)
with
open
(filename,
'w'
) as f:
   
f.write(output)

看起来很简单吧?可能看起来并不像乍一看这么简单。我在产品服务器中调试应用,经常会出现奇怪的行为。

       

这是我看过的失效模式的例子:

  • 失控的服务器进程溢出大量日志,磁盘被填满。write()在截断文件之后抛出异常,文件将会变成空的。

  • 应用的几个实例并行执行。在各个实例结束之后,因为混合了多个实例的输出,文件内容最终变成了天书。

  • 在完成了写操作之后,应用会触发一些后续操作。几秒钟后断电。在我们重启了服务器之后,我们再一次看到了旧的文件内容。已经传递给其它应用的数据与我们在文件中看到的不再一致。

下面没有什么新的内容。本文的目的是为在系统编程方面缺少经验的Python开发者提供常见的方法和技术。我将会提供代码例子,使得开发者可以很容易的将这些方法应用到自己的代码中。

       

“可靠性”意味着什么?

广义的讲,可靠性意味着在所有规定的条件下操作都能执行它所需的函数。至于文件的操作,这个函数就是创建,替换或者追加文件的内容的问题。这里可以从数据库理论上获得灵感。经典的事务模型的ACID性质作为指导来提高可靠性。

开始之前,让我们先看看我们的例子怎样和ACID4个性质扯上关系:

  • 原子性(Atomicity)要求这个事务要么完全成功,要么完全失败。在上面的实例中,磁盘满了可能导致部分内容写入文件。另外,如果正当在写入内容时其它程序又在读取文件,它们可能获得是部分完成的版本,甚至会导致写错误

  • 一致性(Consistency) 表示操作必须从系统的一个状态到另一个状态。一致性可以分为两部分:内部和外部一致性。内部一致性是指文件的数据结构是一致的。外部一致性是指文件的内容与它相关的数据是相符合的。在这个例子中,因为我们不了解这个应用,所以很难推断是否符合一致性。但是因为一致性需要原子性,我们至少可以说没有保证内部一致性。

  • 隔离性(Isolation)如果在并发的执行事务中,多个相同的事务导致了不同的结果,就违反了隔离性。很明显上面的代码对操作失败或者其它都没有保护。

  • 持久性(Durability)意味着改变是持久不变的。在我们告诉用户成功之前,我们必须确保我们的数据存储是可靠的并且不只是一个写缓存。上面的代码已经成功写入数据的前提是假设我们调用write()函数,磁盘I/O就立即执行。但是POSIX标准是不保证这个假设的。

       

尽可能使用数据库系统

如果我们能够获得ACID 四个性质,那么我们增加可靠性方面取得了长远发展。但是这需要很大的编码功劳。为什么重复发明轮子?大多数数据库系统已经有ACID事务了。

可靠性数据存储已经是一个已解决的问题。如果你需要可靠性存储,请使用数据库。很可能,没有几十年的功夫,你自己解决这方面的能力没有那些已经专注这方面好些年的人好。如果你不想安装一个大数据库服务器,那么你可以使用,它具有ACID事务,很小,免费的,而且它包含在Python的中。

       

文章本该在这里就结束的,但是还有一些有根有据的原因,就是不使用数据。它们通常是文件格式或者文件位置约束。这两个在数据库系统中都不好控制。理由如下:

  • 我们必须处理其它应用产生的固定格式或者在固定位置的文件,

  • 我们必须为了其它应用的消耗而写文件(和应用了同样的限制条件)

  • 我们的文件必须方便人阅读或者修改。

...等等。你懂的。

如果我们自己动手实现可靠的文件更新,那么这里有一些编程技术供参考。下面我将展示四种常见的操作文件更新模式。在那之后,我会讨论采取哪些步骤在每个文件更新模式下满足ACID性质。

       

文件更新模式

文件可以以多种方式更新,但是我认为至少有四种常见的模式。这四种模式将做为本文剩余部分的基础。

截断-写

这可能是最基本的模式。在下述例子中,假设的域模型代码读数据,执行一些计算,然后以写模式重新打开存在的文件:

1
2
3
4
5
with
open
(filename,
'r'
) as f:
   
model.read(f)
model.process()
with
open
(filename,
'w'
) as f:
   
model.write(f)

此模式的一个变种以读写模式打开文件(Python中的“加”模式),寻找到开始的位置,显式调用truncate(),重写文件内容。

1
2
3
4
5
6
7
with
open
(filename,
'a+'
) as f:
   
f.seek(
0
)
   
model.
input
(f.read())
   
model.compute()
   
f.seek(
0
)
   
f.truncate()
   
f.write(model.output())

该变种的优势是只打开文件一次,始终保持文件打开。举例来说,这样可以简化加锁。

       

写-替换

另外一种广泛使用的模式是将新内容写到临时文件,之后替换原始文件:

1
2
3
4
5
with tempfile.NamedTemporaryFile(
      
'w'
,
dir
=
os.path.dirname(filename), delete
=
False
) as tf:
   
tf.write(model.output())
   
tempname
=
tf.name
os.rename(tempname, filename)

该方法与截断-写方法相比对错误更具有鲁棒性。请看下面对原子性和一致性的讨论。很多应用使用该方法。

这两个模式很常见,以至于linux内核中的ext4文件系统甚至可以自动,自动修复一些可靠性缺陷。但是不要依赖这一特性:你并不是总是使用ext4,而且管理员可能会关掉这一特性。

       

追加

第三种模式就是追加新数据到已存在的文件:

1
2
with
open
(filename,
'a'
) as f:
   
f.write(model.output())

这个模式用来写日志文件和其它累积处理数据的任务。从技术上讲,它的显著特点就是极其简单。一个有趣的扩展应用就是常规操作中只通过追加操作更新,然后定期重新整理文件,使之更紧凑。

       

Spooldir

这里我们将目录做为逻辑数据存储,为每条记录创建新的唯一命名的文件:

1
2
with
open
(unique_filename(),
'w'
) as f:
   
f.write(model.output())

该模式与附加模式一样具有累积的特点。一个巨大的优势是我们可以在文件名中放入少量元数据。举例来说,这可以用于传达处理状态的信息。spooldir模式的一个特别巧妙的实现是格式。maildirs使用附加子目录的命名方案,以可靠的、无锁的方式执行更新操作。和库为maildir操作提供了方便的封装。

如果你的文件名生成不能保证唯一的结果,甚至有可能要求文件必须实际上是新的。那么调用具有合适标志的低等级os.open():

1
2
3
fd
=
os.
open
(filename, os.O_WRONLY | os.O_CREAT| os.O_EXCL,
0o666
)
with os.fdopen(fd,
'w'
) as f:
   
f.write(...)

在以O_EXCL方式打开文件后,我们用os.fdopen将原始的文件描述符转化为普通的Python文件对象。

       

应用ACID属性到文件更新

下面,我将尝试加强文件更新模式。反过来让我们看看可以做些什么来满足ACID属性。我将会尽可能保持简单,因为我们并不是要写一个完整的数据库系统。请注意本节的材料并不彻底,但是可以为你自己的实验提供一个好的起点。

原子性

写-替换模式提供了原子性,因为底层的os.rename()。这意味着在任意给定时间点,进程或者看到旧的文件,或者看到新的文件。该模式对写错误具有天然的鲁棒性:如果写操作触发异常,重命名操作就不会被执行,所有就没有用损坏的新文件覆盖正确的旧文件的风险。

       

附加模 式并不是原子性的,因为有附加不完整记录的风险。但是有个技巧可以使更新具有原子性:为每个写操作标注校验和。之后读日志的时候,忽略所有没有有效校验和 的记录。以这种方式,只有完整的记录才会被处理。在下面的例子中,应用做周期性的测量,每次在日志中附加一行JSON记录。我们计算记录的字节表示形式的 CRC32校验和,然后附加到同一行:

1
2
3
4
5
6
with
open
(logfile,
'ab'
) as f:
    
for
i
in
range
(
3
):
        
measure
=
{
'timestamp'
: time.time(),
'value'
: random.random()}
        
record
=
json.dumps(measure).encode()
        
checksum
=
'{:8x}'
.
format
(zlib.crc32(record)).encode()
        
f.write(record
+
b
' '
+
checksum
+
b
'\n'
)

该例子代码通过每次创建随机值模拟测量。

1
2
3
4
$ cat log
{
"timestamp"
:
1373396987.258189
,
"value"
:
0.9360123151217828
}
9495b87a
{
"timestamp"
:
1373396987.25825
,
"value"
:
0.40429005476999424
}
149afc22
{
"timestamp"
:
1373396987.258291
,
"value"
:
0.232021160265939
} d229d937

想要处理这个日志文件,我们每次读一行记录,分离校验和,与读到的记录比较。

1
2
3
4
5
6
7
with
open
(logfile,
'rb'
) as f:
    
for
line
in
f:
        
record, checksum
=
line.strip().rsplit(b
' '
,
1
)
        
if
checksum.decode()
=
=
'{:8x}'
.
format
(zlib.crc32(record)):
            
print
(
'read measure: {}'
.
format
(json.loads(record.decode())))
        
else
:
            
print
(
'checksum error for record {}'
.
format
(record))

现在我们通过截断最后一行模拟被截断的写操作:

1
2
3
4
$ cat log
{
"timestamp"
:
1373396987.258189
,
"value"
:
0.9360123151217828
}
9495b87a
{
"timestamp"
:
1373396987.25825
,
"value"
:
0.40429005476999424
}
149afc22
{
"timestamp"
:
1373396987.258291
,
"value"
:
0.23202

当读日志的时候,最后不完整的一行被拒绝:

1
2
3
4
$ read_checksummed_log.py log
read measure: {
'timestamp'
:
1373396987.258189
,
'value'
:
0.9360123151217828
}
read measure: {
'timestamp'
:
1373396987.25825
,
'value'
:
0.40429005476999424
}
checksum error
for
record b
'{"timestamp": 1373396987.258291, "value":'

添加校验和到日志记录的方法被用于大量应用,包括很多数据库系统。

       

spooldir中的单个文件也可以在每个文件中添加校验和。另外一个可能更简单的方法是借用写-替换模式:首先将文件写到一边,然后移到最终的位置。设计一个保护正在被消费者处理的文件的命名方案。在下面的例子中,所有以.tmp结尾的文件都会被读取程序忽略,因此在写操作的时候可以安全的使用。

1
2
3
4
newfile
=
generate_id()
with
open
(newfile
+
'.tmp'
,
'w'
) as f:
   
f.write(model.output())
os.rename(newfile
+
'.tmp'
, newfile)

最后,截断-写是非原子性的。很遗憾我不能提供满足原子性的变种。在执行完截取操作后,文件是空的,还没有新内容写入。如果并发的程序现在读文件或者有异常发生,程序中止,我们既看不久的版本也看不到新的版本。

       

一致性

我谈论的关于原子性的大部分内容也可以应用到一致性。实际上,原子性更新是内部一致性的前提条件。外部一致性意味着同步更新几个文件。这不容易做到,锁文 件可以用来确保读写访问互不干涉。考虑某目录下的文件需要互相保持一致。常用的模式是指定锁文件,用来控制对整个目录的访问。

写程序的例子:

1
2
3
with
open
(os.path.join(dirname,
'.lock'
),
'a+'
) as lockfile:
   
fcntl.flock(lockfile, fcntl.LOCK_EX)
   
model.update(dirname)

读程序的例子:

1
2
3
with
open
(os.path.join(dirname,
'.lock'
),
'a+'
) as lockfile:
   
fcntl.flock(lockfile, fcntl.LOCK_SH)
   
model.readall(dirname)

该方法只有控制所有读程序才生效。因为每次只有一个写程序活动(独占锁阻塞所有共享锁),所有该方法的可扩展性有限。

       

更进一步,我们可以对整个目录应用写-替换模 式。这涉及为每次更新创建新的目录,更新完成后改变符合链接。举例来说,镜像应用维护一个包含压缩包和列出了文件名、文件大小和校验和的索引文件的目录。 当上流的镜像更新,仅仅隔离地对压缩包和索引文件进项原子性更新是不够的。相反,我们需要同时提供压缩包和索引文件以免校验和不匹配。为了解决这个问题, 我们为每次生成维护一个子目录,然后改变符号链接激活该次生成。

1
2
3
4
5
6
7
8
9
10
11
mirror
|
-
-
483
|   |
-
-
a.tgz
|   |
-
-
b.tgz
|   `
-
-
index.json
|
-
-
484
|   |
-
-
a.tgz
|   |
-
-
b.tgz
|   |
-
-
c.tgz
|   `
-
-
index.json
`
-
-
current
-
>
483

新的生成484正在被更新的过程中。当所有压缩包准备好,索引文件更新后,我们可以用一次原子调用os.symlink()来切换current符号链 接。其它应用总是或者看到完全旧的或者完全新的生成。读程序需要使用os.chdir()进入current目录,很重要的是不要用完整路径名指定文件。 否在当读程序打开current/index.json,然后打开current/a.tgz,但是同时符号链接已经改变时就会出现竞争条件。

       

隔离性

隔离性意味着对同一文件的并发更新是可串行化的——存在一个串行调度使得实际执行的并行调度返回相同的结果。“真实的”数据库系统使用像这种高级技术维护可串行性,同时允许高等级的可并行性。回到我们的场景,我们最后使用加锁来串行文件更新。

截断-写更新进行加锁是容易的。仅仅在所有文件操作前获取一个独占锁就可以。下面的例子代码从文件中读取一个整数,然后递增,最后更新文件:

1
2
3
4
5
6
7
8
def
update():
   
with
open
(filename,
'r+'
) as f:
      
fcntl.flock(f, fcntl.LOCK_EX)
      
n
=
int
(f.read())
      
n
+
=
1
      
f.seek(
0
)
      
f.truncate()
      
f.write(
'{}\n'
.
format
(n))

       

使用 写-替换模式加锁更新就有点儿麻烦啦。像 截断-写那样使用锁可能导致更新冲突。某个幼稚的实现可能看起来像这样:

1
2
3
4
5
6
7
8
9
10
def
update():
   
with
open
(filename) as f:
      
fcntl.flock(f, fcntl.LOCK_EX)
      
n
=
int
(f.read())
      
n
+
=
1
      
with tempfile.NamedTemporaryFile(
            
'w'
,
dir
=
os.path.dirname(filename), delete
=
False
) as tf:
         
tf.write(
'{}\n'
.
format
(n))
         
tempname
=
tf.name
      
os.rename(tempname, filename)

这段代码有什么问题呢?设想两个进程竞争更新某个文件。第一个进程运行在前面,但是第二个进程阻塞在fcntl.flock()调用。当第一个进程替换了 文件,释放了锁,现在在第二个进程中打开的文件描述符指向了一个包含旧内容的“幽灵”文件(任意路径名都不可达)。想要避免这个冲突,我们必须检查打开的 文件是否与fcntl.flock()返回的相同。所以我写了一个新的LockedOpen上下文管理器来替换内建的open上下文。来确保我们实际打开 了正确的文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class
LockedOpen(
object
):
 
    
def
__init__(
self
, filename,
*
args,
*
*
kwargs):
        
self
.filename
=
filename
        
self
.open_args
=
args
        
self
.open_kwargs
=
kwargs
        
self
.fileobj
=
None
 
    
def
__enter__(
self
):
        
f
=
open
(
self
.filename,
*
self
.open_args,
*
*
self
.open_kwargs)
        
while
True
:
            
fcntl.flock(f, fcntl.LOCK_EX)
            
fnew
=
open
(
self
.filename,
*
self
.open_args,
*
*
self
.open_kwargs)
            
if
os.path.sameopenfile(f.fileno(), fnew.fileno()):
                
fnew.close()
                
break
            
else
:
                
f.close()
                
f
=
fnew
        
self
.fileobj
=
f
        
return
f
 
    
def
__exit__(
self
, _exc_type, _exc_value, _traceback):
        
self
.fileobj.close()

1
2
3
4
5
6
7
8
9
def
update(
self
):
    
with LockedOpen(filename,
'r+'
) as f:
        
n
=
int
(f.read())
        
n
+
=
1
        
with tempfile.NamedTemporaryFile(
                
'w'
,
dir
=
os.path.dirname(filename), delete
=
False
) as tf:
            
tf.write(
'{}\n'
.
format
(n))
            
tempname
=
tf.name
        
os.rename(tempname, filename)

       

追加更新上锁如同给截断-写更新上锁一样简单:需要一个排他锁,然后追加就完成了。需要长期运行的会将文件长久的打开的进程,可以在更新时释放锁,让其它进入。

spooldir模式有个很优美的性质就是它不需要任何锁。此外,你建立在使用灵活的命名模式和一个健壮的文件名分代。就是一个spooldir模式的好例子。它可以很容易的适应其它情况,不仅仅是处理邮件。

       

持久性

持久性有点特殊,因为它不仅依赖于应用,也与OS和硬件配置有关。理论上来说,我们可以假定,如果数据没有到达持久存储,os.fsync()或 os.fdatasync()调用就没有返回结果。在实际情况中,我们有可能会遇到几个问题:我们可能会面对不完整的fsync实现,或者糟糕的磁盘控制 器配置,它们都无法提供任何持久化的保证。有一个来自  的讨论对哪里会发生错误进行了详尽的讨论。有些像PostgreSQL 之类的数据库系统,甚至提供了 ,以便管理员在运行时刻选择最佳的一个。然而不走运的人只能使用os.fsync(),并期待它可以被正确的实现。

       

通过截断-写模式,在结束写操作以后关闭文件以前,我们需要发送一个同步信号。注意通常这还牵涉到另一个层次的写缓存。glibc 缓存 甚至会在写操作传递到内核以前,在进程内部拦住它。同样为了得到空的glibc缓存,我们需要在同步以前对它flush():

1
2
3
4
with
open
(filename,
'w'
) as f:
   
model.write(f)
   
f.flush()
   
os.fdatasync(f)

要不,你也可以带参数-u调用Python,以此为所有的文件I/O获得未缓冲的写。

大多数时候相较os.fsync()我更喜欢os.fdatasync(),以此避免同步元数据的更新(所有权、大小、mtime…)。元数据的更新可最终导致磁盘I/O搜索操作,这会使整个过程慢不少。

       

写-替换风格更新使用同样的技巧只是成功了一半。我们得确保在代替旧文件之前,新写入文件的内容已经写入了非易失性存储器上了,但是替换操作怎么办?我们不能保证那个目录更新是否执行的刚刚好。在网络上有很多关于的长篇大论。但是在我们这种情况,旧文件和新文件都在同一个目录下,我们可以使用简单的解决方案来逃避这个这题。

1
2
3
4
os.rename(tempname, filename)
dirfd
=
os.
open
(os.path.dirname(filename), os.O_DIRECTORY)
os.fsync(dirfd)
os.close(dirfd)

我们调用底层的os.open()来打开目录(Python自带的open()方法不支持打开目录),然后在目录文件描述符上执行os.fsync()。

对待追加更新和我以及说过的截断-写是相似的。

spooldir模式与写-替换模式同样的目录同步问题。幸运地是,可以使用同样的解决方案:第一步同步文件,然后同步目录。

       

总结

这使可靠的更新文件成为可能。我已经演示了满足 ACID的四大性质。这些展示的实例代码充当一个工具箱。掌握这编程技术最大的满足你的需求。有时,你并不需要满足所有的ACID性质,可能仅仅需要一到 两个。我希望这篇文章可以帮助你去做已充分了解的决定,什么该去实现以及什么该舍弃。

转载于:https://my.oschina.net/zhangxc73912/blog/654818

你可能感兴趣的文章
Yii2---composer方法安装(Mac+MAMP)
查看>>
运维文章
查看>>
转-深入探讨Ruby与Python语法比较
查看>>
linux redhat6.5中 搭建NFS服务
查看>>
JMeter接口压力测试课程入门到高级实战(目录)
查看>>
extjs与struts开发的项目
查看>>
Linux20180423五周第四次课(4月23日)
查看>>
eclipse项目中,dubbo.xml不能识别schema报错
查看>>
MySQL中concat函数(连接字符串)
查看>>
[版本控制]原来Git分支都是这么用的
查看>>
浏览器兼容的获取event.offsetX的最简单方法
查看>>
JDK动态代理
查看>>
作为面试官,如何考察工程师的软素质
查看>>
怎么制作QQ动态表情包,GIF出处是哪
查看>>
万字长文概述NLP中的深度学习技术
查看>>
当量子计算遇到机器学习
查看>>
关于ScrollView can host only one direct child 问题
查看>>
Android WebView 缓存处理
查看>>
Require的使用 CMD
查看>>
为什么软件定制项目难做?软件外包公司该怎么发展?
查看>>