Skip to content Skip to sidebar Skip to footer

Can't Populate Big Chunk Of Data To Mongodb Using Node.js

I am asked to import a big chunk of weather data collected from many sites all over the city. Each site has 1 computer having one folder, which is being synced to a central server

Solution 1:

As Robbie said, streams are the way to go with this. fs.createReadStream() should be used instead of .readFileSync(). I'd start with creating a line reader that takes a path and whatever string/regex you want to split on:

linereader.js

var fs = require("fs");
var util = require("util");
var EventEmitter = require("events").EventEmitter;

functionLineReader(path, splitOn) {

    var readStream = fs.createReadStream(path);
    var self = this;
    var lineNum = 0;
    var buff = ""
    var chunk;

    readStream.on("readable", function() {

        while( (chunk = readStream.read(100)) !== null) {
            buff += chunk.toString();
            var lines = buff.split(splitOn);

            for (var i = 0; i < lines.length - 1; i++) {
                self.emit("line",lines[i]);
                lineNum += 1;
            }
            buff = lines[lines.length - 1];
        }
    });
    readStream.on("close", function() {
        self.emit("line", buff);
        self.emit("close")
    });
    readStream.on("error", function(err) {
        self.emit("error", err);
    })
}
util.inherits(LineReader, EventEmitter);
module.exports = LineReader;

This will read a text file, and emit "line" events for each line read, so you won't have all of them in memory at once. Then, using the async package (or whatever async loop you want to use), loop through the files inserting each document:

app.js

varLineReader = require("./linereader.js");
varasync = require("async");

var paths = ["./text1.txt", "./text2.txt", "./path1/text3.txt"];
var reader;

async.eachSeries(paths, function(path, callback) {

    reader = newLineReader(path, /\n/g);

    reader.on("line", function(line) {
        var doc = turnTextIntoObject(line);
        db.collection("mycollection").insert(doc);
    })
    reader.on("close", callback);
    reader.on("error", callback);
}, function(err) {
    // handle error and finish;
})

Solution 2:

Try using streams instead of loading each file into memory.

I've sent you a pull request with an implementation using streams and async i/o.

This is most of it:

varAsync = require('async');
varCsv = require('csv-streamify');
varEs = require('event-stream');
varFs = require('fs');
varMapping = require('./folder2siteRef.json');
varMongoClient = require('mongodb').MongoClient;

var sourcePath = '/hnet/incoming/' + newDate().getFullYear();

Async.auto({
  db: function (callback) {
    console.log('opening db connection');
    MongoClient.connect('mongodb://localhost:27017/test3', callback);
  },
  subDirectory: function (callback) {
    // read the list of subfolder, which are sitesFs.readdir(sourcePath, callback);
  },
  loadData: ['db', 'subDirectory', function (callback, results) {
    Async.each(results.subDirectory, load(results.db), callback);
  }],
  cleanUp: ['db', 'loadData', function (callback, results) {
    console.log('closing db connection');
    results.db.close(callback);
  }]
}, function (err) {
  console.log(err || 'Done');
});

var load = function (db) {
  returnfunction (directory, callback) {
    var basePath = sourcePath + '/' + directory;
    Async.waterfall([
      function (callback) {
        Fs.readdir(basePath, callback); // array of files in a directory
      },
      function (files, callback) {
        console.log('loading ' + files.length + ' files from ' + directory);
        Async.each(files, function (file, callback) {
          Fs.createReadStream(basePath + '/' + file)
            .pipe(Csv({objectMode: true, columns: true}))
            .pipe(transform(directory))
            .pipe(batch(200))
            .pipe(insert(db).on('end', callback));
        }, callback);
      }
    ], callback);
  };
};

var transform = function (directory) {
  returnEs.map(function (data, callback) {
    data.siteRef = Mapping[directory];
    data.epoch = parseInt((data.TheTime - 25569) * 86400) + 6 * 3600;
    callback(null, data);
  });
};

var insert = function (db) {
  returnEs.map(
    function (data, callback) {
      if (data.length) {
        var bulk = db.collection('hnet').initializeUnorderedBulkOp();
        data.forEach(function (doc) {
          bulk.insert(doc);
        });
        bulk.execute(callback);
      } else {
        callback();
      }
    }
  );
};

var batch = function (batchSize) {
  batchSize = batchSize || 1000;
  var batch = [];

  returnEs.through(
    functionwrite (data) {
      batch.push(data);
      if (batch.length === batchSize) {
        this.emit('data', batch);
        batch = [];
      }
    },
    functionend () {
      if (batch.length) {
        this.emit('data', batch);
        batch = [];
      }
      this.emit('end');
    }
  );
};

I've updated your tomongo.js script using streams. I've also changed it to use async instead of sync for its file i/o.

I tested this against the structure defined in your code with small data sets and it worked really well. I did some limited testing against 3xdirs with 900xfiles and 288xlines. I'm not sure how big each row of your data is, so i threw a few random properties in. Its quite fast. See how it goes with your data. If it causes issues, you could try throttling it with different write concerns when executing the bulk insert operation.

Also check out some of these links for more information on streams in node.js:

http://nodestreams.com - a tool written by John Resig with many stream examples.

And event-stream a very useful streams module.

Post a Comment for "Can't Populate Big Chunk Of Data To Mongodb Using Node.js"