NovelEssay.com Programming Blog

Exploration of Big Data, Machine Learning, Natural Language Processing, and other fun problems.

Loading Wikipedia in to ElasticSearch



This article gives instructions for loading Wikipedia articles in to ElasticSearch. I did this on Windows, but all of these steps should work on any java friendly platform.
  1. Download ElasticSearch
  2. Download stream2es
  3. Download Wikipedia articles
  4. Start ElasticSearch
  5. Run stream2es


Download ElasticSearch

Go to Elastic.co and download ElasticSearch here: https://www.elastic.co/downloads/elasticsearch
Download and unzip the elasticsearch download in to a folder of your choice.

Download stream2es

Go here and download the stream2es java jar file: http://download.elasticsearch.org/stream2es/stream2es
Optional: See stream2es on github for options: https://github.com/elastic/stream2es

Download Wikipedia articles

Go here and download the wikipedia article archive: https://dumps.wikimedia.org/enwiki/latest/
There are many options, but the specific one I downloaded was this: enwiki-latest-pages-articles.xml.bz2
(It's over 12GB, so be sure you have plenty of disk space.)

Start ElasticSearch

I'm on Windows, so I opened a command window and ran this: 
c:\elasticsearch-1.5.2\bin\elasticsearch.bat
That starts up your local ElasticSearch instance at localhost:9200

Run stream2es

  • Move the stream2es file to your ElasticSearch bin folder. I put stream2es here c:\elasticsearch-1.5.2\bin\
  • Move the Wikipedia archive (enwiki-latest-pages-articles.xml.bz2) to your ElasticSearch bin folder too.
  • Run the stream2es java file: 
C:\elasticsearch-1.5.2\bin>java -jar stream2es wiki --target http://localhost:9200/mywiki --log debug --source /enwiki-latest-pages-articles.xml.bz2

Notes:
  1. You can change the "mywiki" to whatever you want your specific ElasticSearch index name to be.
  2. I had some trouble getting stream2es to find my wikipedia archive path on Windows, but the / in front of the file name worked.



I ran this all local on my Windows desktop, and it took 6-8 hours. It appears to be locked up near the end, but it did eventually exit. 

Now, you should have over 16 million Wikipedia articles loaded in to your local ElasticSearch index. Enjoy.

I plan on doing future articles on using this Wikipedia data for machine learning, natural language processing, and topic clustering.

Vowpal Wabbit C# solution to Kaggle competition: Grupo Bimbo Inventory Demand

Kaggle Competition: Grupo Bimbo Inventory Demand

grupo-bimbo-inventory-demand

Vowpal Wabbit C# solution summary:

  1. Download data sets from the Kaggle competition site.
  2. Create a new Visual Studio 2015 solution and project.
  3. Install Vowpal Wabbit Nuget package to your Visual Studio project.
  4. Create a DataSource class to read the data set files.
  5. Create a VWRecord class with annotated properties.
  6. Create a VWWrapper class to manage the Vowpal Wabbit engine instance, training, and prediction API calls.
  7. Create a test program to manage training, prediction, evaluation jobs.
  8. Run test program and tinker until satisfied.


Download data sets from the Kaggle competition site.

Specifically here: grupo-bimbo-inventory-demand/data

You don't necessarily need to download the data to understand this article, but some of the data fields may be referenced later in this article. Review the field detail descriptions to give yourself some context.


Create a new Visual Studio 2015 solution and project.

I created a C# console application in Visual Studio to run my test projects. Nothing fancy about that.


Install Vowpal Wabbit Nuget package to your Visual Studio project.

Use the Nuget package manager in Visual Studio. Search for vowpal wabbit. Install the latest version.


Create a DataSource class to read the data set files.

This class opens the source (or test file) and reads each line. Each line is split in to fields and mapped to properties by its column position. The number of columns viewed tells this class how to map fields. The training data has 11 columns. Test test data has 7 columns. Correct - this is not fancy, but it works.

    public class DataSource
    {
        // Columns
        private static int COLUMN_COUNT = 11;
        private static int COLUMN_COUNT_TEST = 7;

        // Current Record Attributes
        public DataRecord Record = new DataRecord();


        private string sourceReport;

        private System.IO.StreamReader fileReader;
        private int sourceIndex;

        public DataSource(string sourceReport)
        {
            this.sourceReport = sourceReport;
            Reset();
        }

        public bool NextRecord()
        {
            bool foundRecord = false;
            while (!fileReader.EndOfStream)
            {
                try
                {
                    //Processing row
                    string line = fileReader.ReadLine();
                    string[] fields = line.Split(',');

                    // Expect COLUMN_COUNT columns
                    if (fields.Count() != COLUMN_COUNT && fields.Count() != COLUMN_COUNT_TEST)
                    {
                        throw new Exception(string.Format("sourceReportParser column count [{0}] != expected COLUMN_COUNT [{1}]", fields.Count(), COLUMN_COUNT));
                    }

                    Record = new DataRecord();

                    if (fields.Count() == COLUMN_COUNT)
                    {
                        // Semana,Agencia_ID,Canal_ID,Ruta_SAK,Cliente_ID,Producto_ID,Venta_uni_hoy,Venta_hoy,Dev_uni_proxima,Dev_proxima,Demanda_uni_equil
                        Record.WeekId = fields[0];
                        Record.SalesDepotID = int.Parse(fields[1]);
                        Record.SalesChannelID = int.Parse(fields[2]);
                        Record.RouteID = int.Parse(fields[3]);
                        Record.Cliente_ID = int.Parse(fields[4]);
                        Record.Producto_ID = int.Parse(fields[5]);
                        Record.Venta_uni_hoy = fields[6];
                        Record.Venta_hoy = fields[7];
                        Record.Dev_uni_proxima = fields[8];
                        Record.Dev_proxima = fields[9];
                        Record.Demanda_uni_equil = fields[10];
                    }
                    else
                    {
                        //id,Semana,Agencia_ID,Canal_ID,Ruta_SAK,Cliente_ID,Producto_ID
                        Record.Id = fields[0];
                        Record.WeekId = fields[1];
                        Record.SalesDepotID = int.Parse(fields[2]);
                        Record.SalesChannelID = int.Parse(fields[3]);
                        Record.RouteID = int.Parse(fields[4]);
                        Record.Cliente_ID = int.Parse(fields[5]);
                        Record.Producto_ID = int.Parse(fields[6]);
                    }

                    sourceIndex++;

                    // Getting here means we have a good record. Break the loop.
                    foundRecord = true;
                    break;
                }
                catch (Exception e)
                {
                    Console.WriteLine("ERROR: NextRecord failed for line: " + sourceIndex + " with exception: " + e.Message + " Stack: " + e.StackTrace);
                    sourceIndex++;
                }
            }
            return foundRecord;
        }

        public void Reset()
        {
            fileReader = new System.IO.StreamReader(sourceReport);
            // Burn column headers
            string line = fileReader.ReadLine();
            string[] fields = line.Split(',');
            if (fields.Count() != COLUMN_COUNT && fields.Count() != COLUMN_COUNT_TEST)
            {
                throw new Exception(string.Format("sourceReportParser column count [{0}] != expected COLUMN_COUNT [{1}]", fields.Count(), COLUMN_COUNT));
            }
            sourceIndex = 0;
        }

    }


Create a VWRecord class with annotated properties.

This class has a property for each of the fields in the test data except the Week ID. I observed the Week ID was best for splitting the training data up rather than using it as a feature for training or prediction. Notice the annotations on each property has a separate FeatureGroup. I tinkered with putting the properties in the same feature groups, but that yielded worse results. Notice the Enumerize=true annotations. The properties are integers, but they are ID values hence the Enumerize = true setup.

    public class VWRecord
    {        
        [Feature(FeatureGroup = 'h', Enumerize = true)]
        public int SalesDepotID { get; set; }

        [Feature(FeatureGroup = 'i', Enumerize = true)]
        public int SalesChannelID { get; set; }

        [Feature(FeatureGroup = 'j', Enumerize = true)]
        public int RouteID { get; set; }


        [Feature(FeatureGroup = 'k', Enumerize = true)]
        public int Cliente_ID { get; set; }

        [Feature(FeatureGroup = 'l', Enumerize = true)]
        public int Producto_ID { get; set; }
    }

Create a VWWrapper class to manage the Vowpal Wabbit engine instance, training, and prediction API calls.

The VW instance is created with a call to Init. Notice several quadratic and cubic feature spaces specified. I originally tried using --loss_function=squared but found much better results with --loss_function=quantile. Notice the commented out lines in the Train function that allow you to view the serialized VW intput strings. I found that useful in debugging and sanity checking.

    public class VWWrapper
    {
        VW.VowpalWabbit<VWRecord> vw = null;

        public void Init(bool train = true)
        {
            vw = new VW.VowpalWabbit<VWRecord>(new VowpalWabbitSettings {
                EnableStringExampleGeneration = true,
                Verbose = true,
                Arguments = string.Join(" "
                , "-f vw.model"
                //, "--loss_function=squared"
                , "--loss_function=quantile"
                , "--progress 100000"
                //, "--passes 2"
                //, "--cache_file vw.cache"
                , "-b 27"

                , "-q lk"
                , "-q lj"
                , "-q li"
                , "-q lh"

                , "--cubic hkl"
                , "--cubic ikl"
                , "--cubic jkl"

                , "--cubic hjl"
                //, "--cubic hil"
                , "--cubic hij"
                )
            });
            
        }

        public void Train(VWRecord record, float label)
        {
            // Comment this in if you want to see the VW serialized input records:
            //var str = vw.Serializer.Create(vw.Native).SerializeToString(record, new SimpleLabel() { Label = label });
            //Console.WriteLine(str);
            vw.Learn(record, new SimpleLabel() { Label = label });
        }

        public void SaveModel()
        {
            vw.Native.SaveModel();
        }

        public float Predict(VWRecord record)
        {
            return vw.Predict(record, VowpalWabbitPredictionType.Scalar);
        }

Create a test program to manage training, prediction, evaluation jobs.

If you created a Console Application (like I did), then you can set your Main up like this. There are two primary modes: train/evaluate and train/predict. The train & evaluate will let you measure your model against labeled data not used during the training. Vowpal Wabbit does a good job of calculating average loss on its own.


Notice the "Median" collections. When VW predicts a 0 label, we're going to ignore that and use some Median values instead.

        static string trainingSource = @"C:\kaggle\GrupoBimboInventoryDemand\train.csv";
        static string testSource = @"C:\kaggle\GrupoBimboInventoryDemand\test.csv";
        static string outputPredictions = @"C:\kaggle\GrupoBimboInventoryDemand\myPredictions.csv";

        static VWWrapper vw = new VWWrapper();

        static int trainingRecordCount = 0;
        static int testRecordCount = 0;
        static int evaluateRecordCount = 0;

        static Dictionary<int, float> productDemand = new Dictionary<int, float>();
        static Dictionary<int, int> productRecords = new Dictionary<int, int>();
        static Dictionary<int, List<float>> productDemandInstances = new Dictionary<int, List<float>>();
        static List<float> demandInstances = new List<float>();
        static Int64 totalDemand = 0;

        static Dictionary<int, float> productMedians = new Dictionary<int, float>();
        static float allProductsMedian = 0;

        static void Main(string[] args)
        {
            Stopwatch swTotal = new Stopwatch();
            swTotal.Start();

            vw.Init();

            bool testOnly = true;
            if(testOnly)
            {
                DoTraining(true);
                DoEvaluate();
            }
            else
            {
                DoTraining();
                DoPredictions();
            }

            // Save model here if you want to load and reuse it.
            // Current test app doesn't ever load/reuse it.
            //vw.SaveModel();

            swTotal.Stop();
            Console.WriteLine("Done. ElapsedTime: " + swTotal.Elapsed);
        }
        static float Median(float[] xs)
        {
            var ys = xs.OrderBy(x => x).ToList();
            double mid = (ys.Count - 1) / 2.0;
            return (ys[(int)(mid)] + ys[(int)(mid + 0.5)]) / 2;
        }


The DoTraining function loops through the DataSource records and maps fields in to a VWRecord. The VWRecord is sent to the Train function along with the known label. Each record's label is saved in to product demand and instance collections, so we can calculate Median demand values later. If the skipEightNine is set to true, then the training data for weeks 8 & 9 will not be used for training. Week 8 & 9 data is later used for model Evaluation.

        static void DoTraining(bool skipEightNine = false)
        {
            DataSource source = new DataSource(trainingSource);
            while (source.NextRecord())
            {
                if (skipEightNine && (source.Record.WeekId == "8" || source.Record.WeekId == "9")) continue;

                VWRecord vwRecord = new VWRecord();
                vwRecord.Cliente_ID = source.Record.Cliente_ID;
                vwRecord.Producto_ID = source.Record.Producto_ID;
                vwRecord.RouteID = source.Record.RouteID;
                vwRecord.SalesChannelID = source.Record.SalesChannelID;
                vwRecord.SalesDepotID = source.Record.SalesDepotID;

                float label = float.Parse(source.Record.Demanda_uni_equil);

                demandInstances.Add(label);
                totalDemand += int.Parse(label.ToString());
                if (productDemand.ContainsKey(source.Record.Producto_ID))
                {
                    productDemand[source.Record.Producto_ID] += label;
                    productRecords[source.Record.Producto_ID]++;
                    productDemandInstances[source.Record.Producto_ID].Add(label);
                }
                else
                {
                    productDemand.Add(source.Record.Producto_ID, label);
                    productRecords.Add(source.Record.Producto_ID, 1);
                    productDemandInstances.Add(source.Record.Producto_ID, new List<float>() { label });
                }

                trainingRecordCount++;

                vw.Train(vwRecord, label);
            }

            // Calculate medians
            foreach(var product in productDemandInstances.Keys)
            {
                productMedians.Add(product, Median(productDemandInstances[product].ToArray()));
            }
            allProductsMedian = Median(demandInstances.ToArray());
            // end median calculations

            Console.WriteLine("Train Complete. trainingRecordCount = " + trainingRecordCount);
        }


The DoEvaluate function loops through the DataSource records and only processes the Week 8 & 9 records. Each of those records is mapped to a VWRecord class. Then, a Vowpal Wabbit prediction is made on the data. If the prediction is 0, then we use the Medians values as predictions instead.


The difference between the predicted value and the actual value is calculated and reported as the RMSLE value. RMSLE is the Root Mean Squared Logarithmic Error which is used as the competition evaluation measurement:

https://www.kaggle.com/wiki/RootMeanSquaredLogarithmicError

        static void DoEvaluate()
        {
            double logDiffSquaredSum = 0;

            DataSource sourceEval = new DataSource(trainingSource);
            while (sourceEval.NextRecord())
            {
                if (sourceEval.Record.WeekId != "8" && sourceEval.Record.WeekId != "9") continue;

                VWRecord vwRecord = new VWRecord();

                vwRecord.Cliente_ID = sourceEval.Record.Cliente_ID;
                vwRecord.Producto_ID = sourceEval.Record.Producto_ID;
                vwRecord.RouteID = sourceEval.Record.RouteID;
                vwRecord.SalesChannelID = sourceEval.Record.SalesChannelID;
                vwRecord.SalesDepotID = sourceEval.Record.SalesDepotID;

                float actualLabel = float.Parse(sourceEval.Record.Demanda_uni_equil);

                float prediction = vw.Predict(vwRecord);

                if (prediction == 0)
                {
                    if (productDemand.ContainsKey(sourceEval.Record.Producto_ID))
                    {
                        prediction = productMedians[sourceEval.Record.Producto_ID];
                    }
                    else
                    {
                        prediction = allProductsMedian;
                    }
                }

                double logDiff = Math.Log(prediction + 1) - Math.Log(actualLabel + 1);
                logDiffSquaredSum += Math.Pow(logDiff, 2);


                evaluateRecordCount++;
            }
             
            Console.WriteLine("Evaluate Complete. evaluateRecordCount = " + evaluateRecordCount);
            Console.WriteLine("Evaluate RMSLE = " + Math.Pow(logDiffSquaredSum / evaluateRecordCount, 0.5));
            Console.WriteLine("Evaluate DONE!");
        }


After you have a model you like from the train & evaluate process, you will want to run train & predict. The DoPredictions function sets up the test data set as a DataSource, loops through the records, and calls predict for each record. Whenever the prediction is 0, the Median values are used as the prediction instead. The final output is written to a file with two columns (the test record ID and the predicted value).

        static void DoPredictions()
        {
            System.IO.File.WriteAllText(outputPredictions, "id,Demanda_uni_equil\n");

            StringBuilder predictionSB = new StringBuilder();

            DataSource sourceTest = new DataSource(testSource);
            while (sourceTest.NextRecord())
            {
                VWRecord vwRecord = new VWRecord();

                vwRecord.Cliente_ID = sourceTest.Record.Cliente_ID;
                vwRecord.Producto_ID = sourceTest.Record.Producto_ID;
                vwRecord.RouteID = sourceTest.Record.RouteID;
                vwRecord.SalesChannelID = sourceTest.Record.SalesChannelID;
                vwRecord.SalesDepotID = sourceTest.Record.SalesDepotID;

                testRecordCount++;
                if (testRecordCount % 100000 == 0)
                {
                    Console.WriteLine("sourceTest recordCount = " + testRecordCount);
                }

                float prediction = vw.Predict(vwRecord);

                if (prediction == 0)
                {
                    if (productDemand.ContainsKey(sourceTest.Record.Producto_ID))
                    {
                        prediction = productMedians[sourceTest.Record.Producto_ID];
                    }
                    else
                    {
                        prediction = allProductsMedian;
                    }
                }

                string outputLine = sourceTest.Record.Id + "," + prediction + "\n";
                predictionSB.Append(outputLine);
            }

            System.IO.File.AppendAllText(outputPredictions, predictionSB.ToString());

            Console.WriteLine("Predict Complete. recordCount =" + testRecordCount);
        }


Run test program and tinker until satisfied.

If you run this program exactly as it is, you'll get a RMSLE score of about 0.548 for train & evaluate. This yields a Kaggle competition score of 0.532. You can tinker with the features, feature spaces, cubic/quadratic feature, and other Vowpal Wabbit engine configurations to try and improve your scores. I'm sure there are many ways to improve this particular solution, but hopefully this provides you with a good framework to use.