diff options
| author | Arne Georg Gleditsch <argggh@lxr.linpro.no> | 2007-07-05 00:51:08 +0200 | 
|---|---|---|
| committer | Arne Georg Gleditsch <argggh@lxr.linpro.no> | 2007-07-05 00:51:08 +0200 | 
| commit | e9fa4c98bb5f084739d3418ade3f0c51e34a0aa1 (patch) | |
| tree | fec1d635625e031cde7cba1b0a1d95ee92ac760b /lib/LXRng/Index/PgBatch.pm | |
Rebase tree.
Diffstat (limited to 'lib/LXRng/Index/PgBatch.pm')
| -rw-r--r-- | lib/LXRng/Index/PgBatch.pm | 217 | 
1 files changed, 217 insertions, 0 deletions
diff --git a/lib/LXRng/Index/PgBatch.pm b/lib/LXRng/Index/PgBatch.pm new file mode 100644 index 0000000..8f8844c --- /dev/null +++ b/lib/LXRng/Index/PgBatch.pm @@ -0,0 +1,217 @@ +package LXRng::Index::PgBatch; + +# Specialized subclass of LXRng::Index::Pg for doing parallelized +# batched inserts into database.  Higher performance (and higher +# complexity). + +use strict; +use DBI; +use POSIX qw(:sys_wait_h); + +use base qw(LXRng::Index::Pg); + + +sub transaction { +    my ($self, $code) =  @_; + +    if ($self->dbh->{AutoCommit}) { +	$self->dbh->{AutoCommit} = 0; +	$self->dbh->do(q(set constraints all deferred)); +	$code->(); +	$self->flush(); +	$self->dbh->commit(); +	$self->dbh->{AutoCommit} = 1; + +	# At end of outermost transaction, wait for outstanding flushes +	$self->_flush_wait(); +    } +    else { +	# If we're in a transaction already, don't return to +	# AutoCommit state. +	$code->(); + +	# Only occasional synchronization if we're inside another +	# transaction. +	if ($self->{'writes'}++ % 997 == 0) { +	    $self->flush(); +	    $self->dbh->commit(); +	} +    } +} + +sub new { +    my ($class, @args) = @_; + +    my $self = $class->SUPER::new(@args); +    $$self{'writes'} = 0; + +    return $self; +} + +sub flush { +    my ($self) = @_; + +    return unless exists($$self{'cache'}); + +    $self->_flush_wait(); + +    my $pre = $self->prefix; +    $self->dbh->commit() unless $self->dbh->{AutoCommit}; +    my $pid = fork(); +    die("fork failed: $!") unless defined($pid); +    if ($pid == 0) { +	$SIG{'INT'}  = 'IGNORE'; +	$SIG{'QUIT'} = 'IGNORE'; +	$SIG{'TERM'} = 'IGNORE'; + +	my $i = 0; +	$$self{'dbh'} = undef; +	foreach my $table (qw(symbols identifiers usage)) { +	    if (exists($$self{'cache'}{$table})) { +		$self->dbh->do(qq{copy $pre$table from stdin}); +		foreach my $l (@{$$self{'cache'}{$table}}) { +		    $i++; +		    $self->dbh->pg_putline($l); +		} +		$self->dbh->pg_endcopy; +	    } +	} +	$self->dbh->commit() unless $self->dbh->{AutoCommit}; +	$self->dbh->do(q(analyze)) if $i > 100000; +	$self->dbh->disconnect(); +	warn "\n*** index: flushed $i rows\n"; +	kill(9, $$); +    } +    $$self{'flush_pid'} = $pid; +    delete($$self{'cache'}); +    warn "\n*** index: flushing in background\n"; +} + +sub _flush_wait { +    my ($self) = @_; + +    return unless $$self{'flush_pid'}; +    waitpid($$self{'flush_pid'}, WNOHANG); # Reap zombies +    return unless kill(0, $$self{'flush_pid'}); + +    warn "\n*** index: waiting for running flush to complete...\n"; +    $self->dbh->commit() unless $self->dbh->{AutoCommit}; +    waitpid($$self{'flush_pid'}, 0); +} + +sub _cache { +    my ($self, $name) = @_; + +    $$self{'cache'}{$name} ||= []; +    return $$self{'cache'}{$name}; +} + +sub _cached_seqno { +    my ($self, $seqname) = @_; + +   unless (exists($$self{'cached_seqno'}{$seqname}) and +	$$self{'cached_seqno'}{$seqname}{'min'} <= +	$$self{'cached_seqno'}{$seqname}{'max'}) +    { +	my $dbh = $self->dbh; +	my $pre = $self->prefix; +	my $sth = $$self{'sth'}{'_cached_seqno'}{$seqname} ||= +	    $dbh->prepare(qq{select setval('$pre$seqname', +					   nextval('$pre$seqname')+1000)}); +	$sth->execute(); +	my ($id) = $sth->fetchrow_array(); +	$sth->finish(); +	$$self{'cached_seqno'}{$seqname}{'min'} = $id-1000; +	$$self{'cached_seqno'}{$seqname}{'max'} = $id; +    } + +    return $$self{'cached_seqno'}{$seqname}{'min'}++; +} + +sub _prime_symbol_cache { +    my ($self) = @_; + +    my $dbh = $self->dbh; +    my $pre = $self->prefix; +    my $sth = $$self{'sth'}{'_prime_symbol_cache'} ||= +	$dbh->prepare(qq{select name, id from ${pre}symbols}); +    $sth->execute(); +    my %cache; +    while (my ($name, $id) = $sth->fetchrow_array()) { +	$cache{$name} = $id; +    } +    $sth->finish; +     +    $$self{'__symbol_cache'} = \%cache; +} + +sub _add_usage { +    my ($self, $file_id, $line, $symbol_id) = @_; +     +    push(@{$self->_cache('usage')}, "$file_id\t$symbol_id\t$line\n"); + +    return 1; +} + +sub _add_symbol { +    my ($self, $symbol) = @_; + +    my $id = $self->_cached_seqno('symnum'); +    push(@{$self->_cache('symbols')}, "$id\t$symbol\n"); + +    $self->_prime_symbol_cache() +	unless exists $$self{'__symbol_cache'}; + +    $$self{'__symbol_cache'}{$symbol} = $id; + +    return $id; +} + +sub _add_ident { +    my ($self, $rfile_id, $line, $sym_id, $type, $ctx_id) = @_; + +    $ctx_id = '\\N' unless defined($ctx_id); + +    my $id = $self->_cached_seqno('identnum'); +    push(@{$self->_cache('identifiers')}, join("\t", $id, $sym_id, +					       $rfile_id, $line, $type, +					       $ctx_id)."\n"); + +    return $id; +} + +my $_get_symbol_usage = 0; +sub _get_symbol { +    my ($self, $symbol) = @_; + +    unless (exists($$self{'__symbol_cache'})) { +	# Only prime the cache once it's clear that we're likely to +	# hit it a significant number of times. +	return $self->SUPER::_get_symbol($symbol) if +	    $_get_symbol_usage++ < 100; + +	$self->_prime_symbol_cache(); +    } + +    return $$self{'__symbol_cache'}{$symbol} if +	exists $$self{'__symbol_cache'}{$symbol}; +     +    return undef; +} + +sub DESTROY { +    my ($self) = @_; + +    if ($$self{'writes'} > 0) { +	$self->flush(); +	$self->_flush_wait(); +    } + +    if ($$self{'dbh'}) { +	$$self{'dbh'}->rollback() unless $$self{'dbh'}{'AutoCommit'}; +	$$self{'dbh'}->disconnect(); +	delete($$self{'dbh'}); +    } +} + +1;  | 
